diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 045ae6c03f..d970fde9e8 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -44,6 +44,7 @@ from .receipts import ReceiptsStore
from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
+from .openid import OpenIdStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
@@ -81,7 +82,8 @@ class DataStore(RoomMemberStore, RoomStore,
SearchStore,
TagsStore,
AccountDataStore,
- EventPushActionsStore
+ EventPushActionsStore,
+ OpenIdStore,
):
def __init__(self, db_conn, hs):
@@ -114,6 +116,7 @@ class DataStore(RoomMemberStore, RoomStore,
self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
+ self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
self._push_rules_stream_id_gen = ChainedIdGenerator(
diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py
new file mode 100644
index 0000000000..5dabb607bd
--- /dev/null
+++ b/synapse/storage/openid.py
@@ -0,0 +1,32 @@
+from ._base import SQLBaseStore
+
+
+class OpenIdStore(SQLBaseStore):
+ def insert_open_id_token(self, token, ts_valid_until_ms, user_id):
+ return self._simple_insert(
+ table="open_id_tokens",
+ values={
+ "token": token,
+ "ts_valid_until_ms": ts_valid_until_ms,
+ "user_id": user_id,
+ },
+ desc="insert_open_id_token"
+ )
+
+ def get_user_id_for_open_id_token(self, token, ts_now_ms):
+ def get_user_id_for_token_txn(txn):
+ sql = (
+ "SELECT user_id FROM open_id_tokens"
+ " WHERE token = ? AND ? <= ts_valid_until_ms"
+ )
+
+ txn.execute(sql, (token, ts_now_ms))
+
+ rows = txn.fetchall()
+ if not rows:
+ return None
+ else:
+ return rows[0][0]
+ return self.runInteraction(
+ "get_user_id_for_token", get_user_id_for_token_txn
+ )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 57f14fd12b..c8487c8838 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 31
+SCHEMA_VERSION = 32
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 70aa64fb31..26933e593a 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -23,6 +23,7 @@ from .engines import PostgresEngine, Sqlite3Engine
import collections
import logging
+import ujson as json
logger = logging.getLogger(__name__)
@@ -221,3 +222,20 @@ class RoomStore(SQLBaseStore):
aliases.extend(e.content['aliases'])
defer.returnValue((name, aliases))
+
+ def add_event_report(self, room_id, event_id, user_id, reason, content,
+ received_ts):
+ next_id = self._event_reports_id_gen.get_next()
+ return self._simple_insert(
+ table="event_reports",
+ values={
+ "id": next_id,
+ "received_ts": received_ts,
+ "room_id": room_id,
+ "event_id": event_id,
+ "user_id": user_id,
+ "reason": reason,
+ "content": json.dumps(content),
+ },
+ desc="add_event_report"
+ )
diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql
new file mode 100644
index 0000000000..36f37b11c8
--- /dev/null
+++ b/synapse/storage/schema/delta/32/openid.sql
@@ -0,0 +1,9 @@
+
+CREATE TABLE open_id_tokens (
+ token TEXT NOT NULL PRIMARY KEY,
+ ts_valid_until_ms bigint NOT NULL,
+ user_id TEXT NOT NULL,
+ UNIQUE (token)
+);
+
+CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms);
diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql
new file mode 100644
index 0000000000..d13609776f
--- /dev/null
+++ b/synapse/storage/schema/delta/32/reports.sql
@@ -0,0 +1,25 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE event_reports(
+ id BIGINT NOT NULL PRIMARY KEY,
+ received_ts BIGINT NOT NULL,
+ room_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ reason TEXT,
+ content TEXT
+);
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index d338dfcf0a..6c7481a728 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -16,16 +16,56 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
+from twisted.internet import defer, reactor
+
from canonicaljson import encode_canonical_json
+
+from collections import namedtuple
+
+import itertools
import logging
logger = logging.getLogger(__name__)
+_TransactionRow = namedtuple(
+ "_TransactionRow", (
+ "id", "transaction_id", "destination", "ts", "response_code",
+ "response_json",
+ )
+)
+
+_UpdateTransactionRow = namedtuple(
+ "_TransactionRow", (
+ "response_code", "response_json",
+ )
+)
+
+
class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
+ def __init__(self, hs):
+ super(TransactionStore, self).__init__(hs)
+
+ # New transactions that are currently in flights
+ self.inflight_transactions = {}
+
+ # Newly delievered transactions that *weren't* persisted while in flight
+ self.new_delivered_transactions = {}
+
+ # Newly delivered transactions that *were* persisted while in flight
+ self.update_delivered_transactions = {}
+
+ self.last_transaction = {}
+
+ reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
+ hs.get_clock().looping_call(
+ self._persist_in_mem_txns,
+ 1000,
+ )
+
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore):
list: A list of previous transaction ids.
"""
- return self.runInteraction(
- "prep_send_transaction",
- self._prep_send_transaction,
- transaction_id, destination, origin_server_ts
+ auto_id = self._transaction_id_gen.get_next()
+
+ txn_row = _TransactionRow(
+ id=auto_id,
+ transaction_id=transaction_id,
+ destination=destination,
+ ts=origin_server_ts,
+ response_code=0,
+ response_json=None,
)
- def _prep_send_transaction(self, txn, transaction_id, destination,
- origin_server_ts):
+ self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row
- next_id = self._transaction_id_gen.get_next()
+ prev_txn = self.last_transaction.get(destination)
+ if prev_txn:
+ return defer.succeed(prev_txn)
+ else:
+ return self.runInteraction(
+ "_get_prevs_txn",
+ self._get_prevs_txn,
+ destination,
+ )
+ def _get_prevs_txn(self, txn, destination):
# First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
@@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore):
prev_txns = [r["transaction_id"] for r in results]
- # Actually add the new transaction to the sent_transactions table.
-
- self._simple_insert_txn(
- txn,
- table="sent_transactions",
- values={
- "id": next_id,
- "transaction_id": transaction_id,
- "destination": destination,
- "ts": origin_server_ts,
- "response_code": 0,
- "response_json": None,
- }
- )
-
- # TODO Update the tx id -> pdu id mapping
-
return prev_txns
def delivered_txn(self, transaction_id, destination, code, response_dict):
@@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore):
code (int)
response_json (str)
"""
- return self.runInteraction(
- "delivered_txn",
- self._delivered_txn,
- transaction_id, destination, code,
- buffer(encode_canonical_json(response_dict)),
- )
- def _delivered_txn(self, txn, transaction_id, destination,
- code, response_json):
- self._simple_update_one_txn(
- txn,
- table="sent_transactions",
- keyvalues={
- "transaction_id": transaction_id,
- "destination": destination,
- },
- updatevalues={
- "response_code": code,
- "response_json": None, # For now, don't persist response_json
- }
- )
+ txn_row = self.inflight_transactions.get(
+ destination, {}
+ ).pop(transaction_id, None)
+
+ self.last_transaction[destination] = transaction_id
+
+ if txn_row:
+ d = self.new_delivered_transactions.setdefault(destination, {})
+ d[transaction_id] = txn_row._replace(
+ response_code=code,
+ response_json=None, # For now, don't persist response
+ )
+ else:
+ d = self.update_delivered_transactions.setdefault(destination, {})
+ # For now, don't persist response
+ d[transaction_id] = _UpdateTransactionRow(code, None)
def get_transactions_after(self, transaction_id, destination):
"""Get all transactions after a given local transaction_id.
@@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore):
txn.execute(query, (self._clock.time_msec(),))
return self.cursor_to_dict(txn)
+
+ @defer.inlineCallbacks
+ def _persist_in_mem_txns(self):
+ try:
+ inflight = self.inflight_transactions
+ new_delivered = self.new_delivered_transactions
+ update_delivered = self.update_delivered_transactions
+
+ self.inflight_transactions = {}
+ self.new_delivered_transactions = {}
+ self.update_delivered_transactions = {}
+
+ full_rows = [
+ row._asdict()
+ for txn_map in itertools.chain(inflight.values(), new_delivered.values())
+ for row in txn_map.values()
+ ]
+
+ def f(txn):
+ if full_rows:
+ self._simple_insert_many_txn(
+ txn=txn,
+ table="sent_transactions",
+ values=full_rows
+ )
+
+ for dest, txn_map in update_delivered.items():
+ for txn_id, update_row in txn_map.items():
+ self._simple_update_one_txn(
+ txn,
+ table="sent_transactions",
+ keyvalues={
+ "transaction_id": txn_id,
+ "destination": dest,
+ },
+ updatevalues={
+ "response_code": update_row.response_code,
+ "response_json": None, # For now, don't persist response
+ }
+ )
+
+ if full_rows or update_delivered:
+ yield self.runInteraction("_persist_in_mem_txns", f)
+ except:
+ logger.exception("Failed to persist transactions!")
|