diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 045ae6c03f..6928a213e8 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
-from ._base import Cache
+from ._base import Cache, LoggingTransaction
from .directory import DirectoryStore
from .events import EventsStore
from .presence import PresenceStore, UserPresenceState
@@ -44,6 +44,7 @@ from .receipts import ReceiptsStore
from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
+from .openid import OpenIdStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
@@ -81,11 +82,13 @@ class DataStore(RoomMemberStore, RoomStore,
SearchStore,
TagsStore,
AccountDataStore,
- EventPushActionsStore
+ EventPushActionsStore,
+ OpenIdStore,
):
def __init__(self, db_conn, hs):
self.hs = hs
+ self._clock = hs.get_clock()
self.database_engine = hs.database_engine
self.client_ip_last_seen = Cache(
@@ -114,6 +117,7 @@ class DataStore(RoomMemberStore, RoomStore,
self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
+ self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
self._push_rules_stream_id_gen = ChainedIdGenerator(
@@ -145,7 +149,7 @@ class DataStore(RoomMemberStore, RoomStore,
"AccountDataAndTagsChangeCache", account_max,
)
- self.__presence_on_startup = self._get_active_presence(db_conn)
+ self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self._get_cache_dict(
db_conn, "presence_stream",
@@ -170,11 +174,24 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=push_rules_prefill,
)
+ cur = LoggingTransaction(
+ db_conn.cursor(),
+ name="_find_stream_orderings_for_times_txn",
+ database_engine=self.database_engine,
+ after_callbacks=[]
+ )
+ self._find_stream_orderings_for_times_txn(cur)
+ cur.close()
+
+ self.find_stream_orderings_looping_call = self._clock.looping_call(
+ self._find_stream_orderings_for_times, 60 * 60 * 1000
+ )
+
super(DataStore, self).__init__(hs)
def take_presence_startup_info(self):
- active_on_startup = self.__presence_on_startup
- self.__presence_on_startup = None
+ active_on_startup = self._presence_on_startup
+ self._presence_on_startup = None
return active_on_startup
def _get_active_presence(self, db_conn):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1e27c2c0ce..32c6677d47 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -152,8 +152,8 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
- self._db_pool = hs.get_db_pool()
self._clock = hs.get_clock()
+ self._db_pool = hs.get_db_pool()
self._previous_txn_total_time = 0
self._current_txn_total_time = 0
@@ -453,7 +453,9 @@ class SQLBaseStore(object):
keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values
insertion_values (dict): key/values to use when inserting
- Returns: A deferred
+ Returns:
+ Deferred(bool): True if a new entry was created, False if an
+ existing one was updated.
"""
return self.runInteraction(
desc,
@@ -498,6 +500,10 @@ class SQLBaseStore(object):
)
txn.execute(sql, allvalues.values())
+ return True
+ else:
+ return False
+
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 7a7fbf1e52..ec7e8d40d2 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -16,6 +16,8 @@
from ._base import SQLBaseStore
from twisted.internet import defer
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+
import ujson as json
import logging
@@ -24,6 +26,7 @@ logger = logging.getLogger(__name__)
class AccountDataStore(SQLBaseStore):
+ @cached()
def get_account_data_for_user(self, user_id):
"""Get all the client account_data for a user.
@@ -60,6 +63,47 @@ class AccountDataStore(SQLBaseStore):
"get_account_data_for_user", get_account_data_for_user_txn
)
+ @cachedInlineCallbacks(num_args=2)
+ def get_global_account_data_by_type_for_user(self, data_type, user_id):
+ """
+ Returns:
+ Deferred: A dict
+ """
+ result = yield self._simple_select_one_onecol(
+ table="account_data",
+ keyvalues={
+ "user_id": user_id,
+ "account_data_type": data_type,
+ },
+ retcol="content",
+ desc="get_global_account_data_by_type_for_user",
+ allow_none=True,
+ )
+
+ if result:
+ defer.returnValue(json.loads(result))
+ else:
+ defer.returnValue(None)
+
+ @cachedList(cached_method_name="get_global_account_data_by_type_for_user",
+ num_args=2, list_name="user_ids", inlineCallbacks=True)
+ def get_global_account_data_by_type_for_users(self, data_type, user_ids):
+ rows = yield self._simple_select_many_batch(
+ table="account_data",
+ column="user_id",
+ iterable=user_ids,
+ keyvalues={
+ "account_data_type": data_type,
+ },
+ retcols=("user_id", "content",),
+ desc="get_global_account_data_by_type_for_users",
+ )
+
+ defer.returnValue({
+ row["user_id"]: json.loads(row["content"]) if row["content"] else None
+ for row in rows
+ })
+
def get_account_data_for_room(self, user_id, room_id):
"""Get all the client account_data for a user for a room.
@@ -193,6 +237,7 @@ class AccountDataStore(SQLBaseStore):
self._account_data_stream_cache.entity_has_changed,
user_id, next_id,
)
+ txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id:
@@ -232,6 +277,11 @@ class AccountDataStore(SQLBaseStore):
self._account_data_stream_cache.entity_has_changed,
user_id, next_id,
)
+ txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
+ txn.call_after(
+ self.get_global_account_data_by_type_for_user.invalidate,
+ (account_data_type, user_id,)
+ )
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id:
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 371600eebb..feb9d228ae 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -13,16 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import urllib
-import yaml
import simplejson as json
from twisted.internet import defer
from synapse.api.constants import Membership
-from synapse.appservice import ApplicationService, AppServiceTransaction
-from synapse.config._base import ConfigError
+from synapse.appservice import AppServiceTransaction
+from synapse.config.appservice import load_appservices
from synapse.storage.roommember import RoomsForUser
-from synapse.types import UserID
from ._base import SQLBaseStore
@@ -34,7 +31,7 @@ class ApplicationServiceStore(SQLBaseStore):
def __init__(self, hs):
super(ApplicationServiceStore, self).__init__(hs)
self.hostname = hs.hostname
- self.services_cache = ApplicationServiceStore.load_appservices(
+ self.services_cache = load_appservices(
hs.hostname,
hs.config.app_service_config_files
)
@@ -144,102 +141,6 @@ class ApplicationServiceStore(SQLBaseStore):
return rooms_for_user_matching_user_id
- @classmethod
- def _load_appservice(cls, hostname, as_info, config_filename):
- required_string_fields = [
- "id", "url", "as_token", "hs_token", "sender_localpart"
- ]
- for field in required_string_fields:
- if not isinstance(as_info.get(field), basestring):
- raise KeyError("Required string field: '%s' (%s)" % (
- field, config_filename,
- ))
-
- localpart = as_info["sender_localpart"]
- if urllib.quote(localpart) != localpart:
- raise ValueError(
- "sender_localpart needs characters which are not URL encoded."
- )
- user = UserID(localpart, hostname)
- user_id = user.to_string()
-
- # namespace checks
- if not isinstance(as_info.get("namespaces"), dict):
- raise KeyError("Requires 'namespaces' object.")
- for ns in ApplicationService.NS_LIST:
- # specific namespaces are optional
- if ns in as_info["namespaces"]:
- # expect a list of dicts with exclusive and regex keys
- for regex_obj in as_info["namespaces"][ns]:
- if not isinstance(regex_obj, dict):
- raise ValueError(
- "Expected namespace entry in %s to be an object,"
- " but got %s", ns, regex_obj
- )
- if not isinstance(regex_obj.get("regex"), basestring):
- raise ValueError(
- "Missing/bad type 'regex' key in %s", regex_obj
- )
- if not isinstance(regex_obj.get("exclusive"), bool):
- raise ValueError(
- "Missing/bad type 'exclusive' key in %s", regex_obj
- )
- return ApplicationService(
- token=as_info["as_token"],
- url=as_info["url"],
- namespaces=as_info["namespaces"],
- hs_token=as_info["hs_token"],
- sender=user_id,
- id=as_info["id"],
- )
-
- @classmethod
- def load_appservices(cls, hostname, config_files):
- """Returns a list of Application Services from the config files."""
- if not isinstance(config_files, list):
- logger.warning(
- "Expected %s to be a list of AS config files.", config_files
- )
- return []
-
- # Dicts of value -> filename
- seen_as_tokens = {}
- seen_ids = {}
-
- appservices = []
-
- for config_file in config_files:
- try:
- with open(config_file, 'r') as f:
- appservice = ApplicationServiceStore._load_appservice(
- hostname, yaml.load(f), config_file
- )
- if appservice.id in seen_ids:
- raise ConfigError(
- "Cannot reuse ID across application services: "
- "%s (files: %s, %s)" % (
- appservice.id, config_file, seen_ids[appservice.id],
- )
- )
- seen_ids[appservice.id] = config_file
- if appservice.token in seen_as_tokens:
- raise ConfigError(
- "Cannot reuse as_token across application services: "
- "%s (files: %s, %s)" % (
- appservice.token,
- config_file,
- seen_as_tokens[appservice.token],
- )
- )
- seen_as_tokens[appservice.token] = config_file
- logger.info("Loaded application service: %s", appservice)
- appservices.append(appservice)
- except Exception as e:
- logger.error("Failed to load appservice from '%s'", config_file)
- logger.exception(e)
- raise
- return appservices
-
class ApplicationServiceTransactionStore(SQLBaseStore):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 86a98b6f11..940e11d7a2 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore):
+ def __init__(self, hs):
+ self.stream_ordering_month_ago = None
+ super(EventPushActionsStore, self).__init__(hs)
+
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
@@ -115,19 +119,23 @@ class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range(self, user_id,
min_stream_ordering,
- max_stream_ordering=None):
+ max_stream_ordering=None,
+ limit=20):
def get_after_receipt(txn):
sql = (
- "SELECT ep.event_id, ep.stream_ordering, ep.actions "
- "FROM event_push_actions AS ep, ("
- " SELECT room_id, user_id,"
- " max(topological_ordering) as topological_ordering,"
- " max(stream_ordering) as stream_ordering"
+ "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+ "e.received_ts "
+ "FROM ("
+ " SELECT room_id, user_id, "
+ " max(topological_ordering) as topological_ordering, "
+ " max(stream_ordering) as stream_ordering "
" FROM events"
" NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
" GROUP BY room_id, user_id"
- ") AS rl "
- "WHERE"
+ ") AS rl,"
+ " event_push_actions AS ep"
+ " INNER JOIN events AS e USING (room_id, event_id)"
+ " WHERE"
" ep.room_id = rl.room_id"
" AND ("
" ep.topological_ordering > rl.topological_ordering"
@@ -144,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore):
if max_stream_ordering is not None:
sql += " AND ep.stream_ordering <= ?"
args.append(max_stream_ordering)
- sql += " ORDER BY ep.stream_ordering ASC"
+ sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
+ args.append(limit)
txn.execute(sql, args)
return txn.fetchall()
after_read_receipt = yield self.runInteraction(
@@ -153,11 +162,13 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn):
sql = (
- "SELECT ep.event_id, ep.stream_ordering, ep.actions "
- "FROM event_push_actions AS ep "
- "WHERE ep.room_id not in ("
+ "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+ " e.received_ts"
+ " FROM event_push_actions AS ep"
+ " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+ " WHERE ep.room_id not in ("
" SELECT room_id FROM events NATURAL JOIN receipts_linearized"
- " WHERE receipt_type = 'm.read' AND user_id = ? "
+ " WHERE receipt_type = 'm.read' AND user_id = ?"
" GROUP BY room_id"
") AND ep.user_id = ? AND ep.stream_ordering > ?"
)
@@ -175,12 +186,30 @@ class EventPushActionsStore(SQLBaseStore):
defer.returnValue([
{
"event_id": row[0],
- "stream_ordering": row[1],
- "actions": json.loads(row[2]),
+ "room_id": row[1],
+ "stream_ordering": row[2],
+ "actions": json.loads(row[3]),
+ "received_ts": row[4],
} for row in after_read_receipt + no_read_receipt
])
@defer.inlineCallbacks
+ def get_time_of_last_push_action_before(self, stream_ordering):
+ def f(txn):
+ sql = (
+ "SELECT e.received_ts"
+ " FROM event_push_actions AS ep"
+ " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+ " WHERE ep.stream_ordering > ?"
+ " ORDER BY ep.stream_ordering ASC"
+ " LIMIT 1"
+ )
+ txn.execute(sql, (stream_ordering,))
+ return txn.fetchone()
+ result = yield self.runInteraction("get_time_of_last_push_action_before", f)
+ defer.returnValue(result[0] if result else None)
+
+ @defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
def f(txn):
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
@@ -201,6 +230,93 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id)
)
+ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
+ topological_ordering):
+ """
+ Purges old, stale push actions for a user and room before a given
+ topological_ordering
+ Args:
+ txn: The transcation
+ room_id: Room ID to delete from
+ user_id: user ID to delete for
+ topological_ordering: The lowest topological ordering which will
+ not be deleted.
+ """
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (room_id, user_id, )
+ )
+
+ # We need to join on the events table to get the received_ts for
+ # event_push_actions and sqlite won't let us use a join in a delete so
+ # we can't just delete where received_ts < x. Furthermore we can
+ # only identify event_push_actions by a tuple of room_id, event_id
+ # we we can't use a subquery.
+ # Instead, we look up the stream ordering for the last event in that
+ # room received before the threshold time and delete event_push_actions
+ # in the room with a stream_odering before that.
+ txn.execute(
+ "DELETE FROM event_push_actions "
+ " WHERE user_id = ? AND room_id = ? AND "
+ " topological_ordering < ? AND stream_ordering < ?",
+ (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+ )
+
+ @defer.inlineCallbacks
+ def _find_stream_orderings_for_times(self):
+ yield self.runInteraction(
+ "_find_stream_orderings_for_times",
+ self._find_stream_orderings_for_times_txn
+ )
+
+ def _find_stream_orderings_for_times_txn(self, txn):
+ logger.info("Searching for stream ordering 1 month ago")
+ self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 month ago: it's %d",
+ self.stream_ordering_month_ago
+ )
+
+ def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+ """
+ Find the stream_ordering of the first event that was received after
+ a given timestamp. This is relatively slow as there is no index on
+ received_ts but we can then use this to delete push actions before
+ this.
+
+ received_ts must necessarily be in the same order as stream_ordering
+ and stream_ordering is indexed, so we manually binary search using
+ stream_ordering
+ """
+ txn.execute("SELECT MAX(stream_ordering) FROM events")
+ max_stream_ordering = txn.fetchone()[0]
+
+ if max_stream_ordering is None:
+ return 0
+
+ range_start = 0
+ range_end = max_stream_ordering
+
+ sql = (
+ "SELECT received_ts FROM events"
+ " WHERE stream_ordering > ?"
+ " ORDER BY stream_ordering"
+ " LIMIT 1"
+ )
+
+ while range_end - range_start > 1:
+ middle = int((range_end + range_start) / 2)
+ txn.execute(sql, (middle,))
+ middle_ts = txn.fetchone()[0]
+ if ts > middle_ts:
+ range_start = middle
+ else:
+ range_end = middle
+
+ return range_end
+
def _action_has_highlight(actions):
for action in actions:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 21487724ed..2b3f79577b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,12 +19,14 @@ from twisted.internet import defer, reactor
from synapse.events import FrozenEvent, USE_FROZEN_DICTS
from synapse.events.utils import prune_event
+from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json
-from collections import namedtuple
+from collections import deque, namedtuple
+
import logging
import math
@@ -50,28 +52,169 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events
EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
+class _EventPeristenceQueue(object):
+ """Queues up events so that they can be persisted in bulk with only one
+ concurrent transaction per room.
+ """
+
+ _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
+ "events_and_contexts", "current_state", "backfilled", "deferred",
+ ))
+
+ def __init__(self):
+ self._event_persist_queues = {}
+ self._currently_persisting_rooms = set()
+
+ def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
+ """Add events to the queue, with the given persist_event options.
+ """
+ queue = self._event_persist_queues.setdefault(room_id, deque())
+ if queue:
+ end_item = queue[-1]
+ if end_item.current_state or current_state:
+ # We perist events with current_state set to True one at a time
+ pass
+ if end_item.backfilled == backfilled:
+ end_item.events_and_contexts.extend(events_and_contexts)
+ return end_item.deferred.observe()
+
+ deferred = ObservableDeferred(defer.Deferred())
+
+ queue.append(self._EventPersistQueueItem(
+ events_and_contexts=events_and_contexts,
+ backfilled=backfilled,
+ current_state=current_state,
+ deferred=deferred,
+ ))
+
+ return deferred.observe()
+
+ def handle_queue(self, room_id, per_item_callback):
+ """Attempts to handle the queue for a room if not already being handled.
+
+ The given callback will be invoked with for each item in the queue,1
+ of type _EventPersistQueueItem. The per_item_callback will continuously
+ be called with new items, unless the queue becomnes empty. The return
+ value of the function will be given to the deferreds waiting on the item,
+ exceptions will be passed to the deferres as well.
+
+ This function should therefore be called whenever anything is added
+ to the queue.
+
+ If another callback is currently handling the queue then it will not be
+ invoked.
+ """
+
+ if room_id in self._currently_persisting_rooms:
+ return
+
+ self._currently_persisting_rooms.add(room_id)
+
+ @defer.inlineCallbacks
+ def handle_queue_loop():
+ try:
+ queue = self._get_drainining_queue(room_id)
+ for item in queue:
+ try:
+ ret = yield per_item_callback(item)
+ item.deferred.callback(ret)
+ except Exception as e:
+ item.deferred.errback(e)
+ finally:
+ queue = self._event_persist_queues.pop(room_id, None)
+ if queue:
+ self._event_persist_queues[room_id] = queue
+ self._currently_persisting_rooms.discard(room_id)
+
+ preserve_fn(handle_queue_loop)()
+
+ def _get_drainining_queue(self, room_id):
+ queue = self._event_persist_queues.setdefault(room_id, deque())
+
+ try:
+ while True:
+ yield queue.popleft()
+ except IndexError:
+ # Queue has been drained.
+ pass
+
+
class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
def __init__(self, hs):
super(EventsStore, self).__init__(hs)
+ self._clock = hs.get_clock()
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
- @defer.inlineCallbacks
+ self._event_persist_queue = _EventPeristenceQueue()
+
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
backfilled: ?
+ """
+ partitioned = {}
+ for event, ctx in events_and_contexts:
+ partitioned.setdefault(event.room_id, []).append((event, ctx))
+
+ deferreds = []
+ for room_id, evs_ctxs in partitioned.items():
+ d = self._event_persist_queue.add_to_queue(
+ room_id, evs_ctxs,
+ backfilled=backfilled,
+ current_state=None,
+ )
+ deferreds.append(d)
- Returns: Tuple of stream_orderings where the first is the minimum and
- last is the maximum stream ordering assigned to the events when
- persisting.
+ for room_id in partitioned.keys():
+ self._maybe_start_persisting(room_id)
- """
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+ @defer.inlineCallbacks
+ @log_function
+ def persist_event(self, event, context, current_state=None, backfilled=False):
+ deferred = self._event_persist_queue.add_to_queue(
+ event.room_id, [(event, context)],
+ backfilled=backfilled,
+ current_state=current_state,
+ )
+
+ self._maybe_start_persisting(event.room_id)
+
+ yield deferred
+
+ max_persisted_id = yield self._stream_id_gen.get_current_token()
+ defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
+
+ def _maybe_start_persisting(self, room_id):
+ @defer.inlineCallbacks
+ def persisting_queue(item):
+ if item.current_state:
+ for event, context in item.events_and_contexts:
+ # There should only ever be one item in
+ # events_and_contexts when current_state is
+ # not None
+ yield self._persist_event(
+ event, context,
+ current_state=item.current_state,
+ backfilled=item.backfilled,
+ )
+ else:
+ yield self._persist_events(
+ item.events_and_contexts,
+ backfilled=item.backfilled,
+ )
+
+ self._event_persist_queue.handle_queue(room_id, persisting_queue)
+
+ @defer.inlineCallbacks
+ def _persist_events(self, events_and_contexts, backfilled=False):
if not events_and_contexts:
return
@@ -118,8 +261,7 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context, current_state=None, backfilled=False):
-
+ def _persist_event(self, event, context, current_state=None, backfilled=False):
try:
with self._stream_id_gen.get_next() as stream_ordering:
with self._state_groups_id_gen.get_next() as state_group_id:
@@ -136,9 +278,6 @@ class EventsStore(SQLBaseStore):
except _RollbackButIsFineException:
pass
- max_persisted_id = yield self._stream_id_gen.get_current_token()
- defer.returnValue((stream_ordering, max_persisted_id))
-
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
@@ -203,9 +342,6 @@ class EventsStore(SQLBaseStore):
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
- txn.call_after(
- self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
- )
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
@@ -427,6 +563,7 @@ class EventsStore(SQLBaseStore):
"outlier": event.internal_metadata.is_outlier(),
"content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
+ "received_ts": self._clock.time_msec(),
}
for event, _ in events_and_contexts
],
@@ -1143,6 +1280,12 @@ class EventsStore(SQLBaseStore):
current_backfill_id, current_forward_id, limit):
"""Get all the new events that have arrived at the server either as
new events or as backfilled events"""
+ have_backfill_events = last_backfill_id != current_backfill_id
+ have_forward_events = last_forward_id != current_forward_id
+
+ if not have_backfill_events and not have_forward_events:
+ return defer.succeed(AllNewEventsResult([], [], [], [], []))
+
def get_all_new_events_txn(txn):
sql = (
"SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group"
@@ -1155,7 +1298,7 @@ class EventsStore(SQLBaseStore):
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)
- if last_forward_id != current_forward_id:
+ if have_forward_events:
txn.execute(sql, (last_forward_id, current_forward_id, limit))
new_forward_events = txn.fetchall()
@@ -1199,7 +1342,7 @@ class EventsStore(SQLBaseStore):
" ORDER BY e.stream_ordering DESC"
" LIMIT ?"
)
- if last_backfill_id != current_backfill_id:
+ if have_backfill_events:
txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
new_backfill_events = txn.fetchall()
diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py
new file mode 100644
index 0000000000..5dabb607bd
--- /dev/null
+++ b/synapse/storage/openid.py
@@ -0,0 +1,32 @@
+from ._base import SQLBaseStore
+
+
+class OpenIdStore(SQLBaseStore):
+ def insert_open_id_token(self, token, ts_valid_until_ms, user_id):
+ return self._simple_insert(
+ table="open_id_tokens",
+ values={
+ "token": token,
+ "ts_valid_until_ms": ts_valid_until_ms,
+ "user_id": user_id,
+ },
+ desc="insert_open_id_token"
+ )
+
+ def get_user_id_for_open_id_token(self, token, ts_now_ms):
+ def get_user_id_for_token_txn(txn):
+ sql = (
+ "SELECT user_id FROM open_id_tokens"
+ " WHERE token = ? AND ? <= ts_valid_until_ms"
+ )
+
+ txn.execute(sql, (token, ts_now_ms))
+
+ rows = txn.fetchall()
+ if not rows:
+ return None
+ else:
+ return rows[0][0]
+ return self.runInteraction(
+ "get_user_id_for_token", get_user_id_for_token_txn
+ )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 57f14fd12b..c8487c8838 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 31
+SCHEMA_VERSION = 32
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 07f5fae8dd..3fab57a7e8 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -149,6 +149,7 @@ class PresenceStore(SQLBaseStore):
"status_msg",
"currently_active",
),
+ desc="get_presence_for_users",
)
for row in rows:
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index d2bf7f2aec..786d6f6d67 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -14,7 +14,8 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
+from synapse.push.baserules import list_with_base_rules
from twisted.internet import defer
import logging
@@ -23,8 +24,31 @@ import simplejson as json
logger = logging.getLogger(__name__)
+def _load_rules(rawrules, enabled_map):
+ ruleslist = []
+ for rawrule in rawrules:
+ rule = dict(rawrule)
+ rule["conditions"] = json.loads(rawrule["conditions"])
+ rule["actions"] = json.loads(rawrule["actions"])
+ ruleslist.append(rule)
+
+ # We're going to be mutating this a lot, so do a deep copy
+ rules = list(list_with_base_rules(ruleslist))
+
+ for i, rule in enumerate(rules):
+ rule_id = rule['rule_id']
+ if rule_id in enabled_map:
+ if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+ # Rules are cached across users.
+ rule = dict(rule)
+ rule['enabled'] = bool(enabled_map[rule_id])
+ rules[i] = rule
+
+ return rules
+
+
class PushRuleStore(SQLBaseStore):
- @cachedInlineCallbacks()
+ @cachedInlineCallbacks(lru=True)
def get_push_rules_for_user(self, user_id):
rows = yield self._simple_select_list(
table="push_rules",
@@ -42,9 +66,13 @@ class PushRuleStore(SQLBaseStore):
key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))
)
- defer.returnValue(rows)
+ enabled_map = yield self.get_push_rules_enabled_for_user(user_id)
- @cachedInlineCallbacks()
+ rules = _load_rules(rows, enabled_map)
+
+ defer.returnValue(rules)
+
+ @cachedInlineCallbacks(lru=True)
def get_push_rules_enabled_for_user(self, user_id):
results = yield self._simple_select_list(
table="push_rules_enable",
@@ -60,12 +88,16 @@ class PushRuleStore(SQLBaseStore):
r['rule_id']: False if r['enabled'] == 0 else True for r in results
})
- @defer.inlineCallbacks
+ @cachedList(cached_method_name="get_push_rules_for_user",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules(self, user_ids):
if not user_ids:
defer.returnValue({})
- results = {}
+ results = {
+ user_id: []
+ for user_id in user_ids
+ }
rows = yield self._simple_select_many_batch(
table="push_rules",
@@ -75,18 +107,32 @@ class PushRuleStore(SQLBaseStore):
desc="bulk_get_push_rules",
)
- rows.sort(key=lambda e: (-e["priority_class"], -e["priority"]))
+ rows.sort(
+ key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))
+ )
for row in rows:
results.setdefault(row['user_name'], []).append(row)
+
+ enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
+
+ for user_id, rules in results.items():
+ results[user_id] = _load_rules(
+ rules, enabled_map_by_user.get(user_id, {})
+ )
+
defer.returnValue(results)
- @defer.inlineCallbacks
+ @cachedList(cached_method_name="get_push_rules_enabled_for_user",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules_enabled(self, user_ids):
if not user_ids:
defer.returnValue({})
- results = {}
+ results = {
+ user_id: {}
+ for user_id in user_ids
+ }
rows = yield self._simple_select_many_batch(
table="push_rules_enable",
@@ -96,7 +142,8 @@ class PushRuleStore(SQLBaseStore):
desc="bulk_get_push_rules_enabled",
)
for row in rows:
- results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled']
+ enabled = bool(row['enabled'])
+ results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
defer.returnValue(results)
@defer.inlineCallbacks
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index e5755c0aea..a7d7c54d7e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
import logging
import simplejson as json
@@ -106,6 +106,9 @@ class PusherStore(SQLBaseStore):
return self._pushers_id_gen.get_current_token()
def get_all_updated_pushers(self, last_id, current_id, limit):
+ if last_id == current_id:
+ return defer.succeed(([], []))
+
def get_all_updated_pushers_txn(txn):
sql = (
"SELECT id, user_name, access_token, profile_tag, kind,"
@@ -132,19 +135,35 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
- @cachedInlineCallbacks(num_args=1)
- def get_users_with_pushers_in_room(self, room_id):
- users = yield self.get_users_in_room(room_id)
-
+ @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000)
+ def get_if_user_has_pusher(self, user_id):
result = yield self._simple_select_many_batch(
table='pushers',
+ keyvalues={
+ 'user_name': 'user_id',
+ },
+ retcol='user_name',
+ desc='get_if_user_has_pusher',
+ allow_none=True,
+ )
+
+ defer.returnValue(bool(result))
+
+ @cachedList(cached_method_name="get_if_user_has_pusher",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
+ def get_if_users_have_pushers(self, user_ids):
+ rows = yield self._simple_select_many_batch(
+ table='pushers',
column='user_name',
- iterable=users,
+ iterable=user_ids,
retcols=['user_name'],
- desc='get_users_with_pushers_in_room'
+ desc='get_if_users_have_pushers'
)
- defer.returnValue([r['user_name'] for r in result])
+ result = {user_id: False for user_id in user_ids}
+ result.update({r['user_name']: True for r in rows})
+
+ defer.returnValue(result)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
@@ -153,8 +172,7 @@ class PusherStore(SQLBaseStore):
profile_tag=""):
with self._pushers_id_gen.get_next() as stream_id:
def f(txn):
- txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
- return self._simple_upsert_txn(
+ newly_inserted = self._simple_upsert_txn(
txn,
"pushers",
{
@@ -175,11 +193,18 @@ class PusherStore(SQLBaseStore):
"id": stream_id,
},
)
- defer.returnValue((yield self.runInteraction("add_pusher", f)))
+ if newly_inserted:
+ # get_if_user_has_pusher only cares if the user has
+ # at least *one* pusher.
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+
+ yield self.runInteraction("add_pusher", f)
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id):
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+
self._simple_delete_one_txn(
txn,
"pushers",
@@ -191,6 +216,7 @@ class PusherStore(SQLBaseStore):
{"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
{"stream_id": stream_id},
)
+
with self._pushers_id_gen.get_next() as stream_id:
yield self.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
@@ -230,3 +256,30 @@ class PusherStore(SQLBaseStore):
{'failing_since': failing_since},
desc="update_pusher_failing_since",
)
+
+ @defer.inlineCallbacks
+ def get_throttle_params_by_room(self, pusher_id):
+ res = yield self._simple_select_list(
+ "pusher_throttle",
+ {"pusher": pusher_id},
+ ["room_id", "last_sent_ts", "throttle_ms"],
+ desc="get_throttle_params_by_room"
+ )
+
+ params_by_room = {}
+ for row in res:
+ params_by_room[row["room_id"]] = {
+ "last_sent_ts": row["last_sent_ts"],
+ "throttle_ms": row["throttle_ms"]
+ }
+
+ defer.returnValue(params_by_room)
+
+ @defer.inlineCallbacks
+ def set_throttle_params(self, pusher_id, room_id, params):
+ yield self._simple_upsert(
+ "pusher_throttle",
+ {"pusher": pusher_id, "room_id": room_id},
+ params,
+ desc="set_throttle_params"
+ )
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 3b8805593e..8c26f39fbb 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -34,6 +34,26 @@ class ReceiptsStore(SQLBaseStore):
"ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
)
+ @cachedInlineCallbacks()
+ def get_users_with_read_receipts_in_room(self, room_id):
+ receipts = yield self.get_receipts_for_room(room_id, "m.read")
+ defer.returnValue(set(r['user_id'] for r in receipts))
+
+ def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type,
+ user_id):
+ if receipt_type != "m.read":
+ return
+
+ # Returns an ObservableDeferred
+ res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
+
+ if res and res.called and user_id in res.result:
+ # We'd only be adding to the set, so no point invalidating if the
+ # user is already there
+ return
+
+ self.get_users_with_read_receipts_in_room.invalidate((room_id,))
+
@cached(num_args=2)
def get_receipts_for_room(self, room_id, receipt_type):
return self._simple_select_list(
@@ -100,7 +120,7 @@ class ReceiptsStore(SQLBaseStore):
defer.returnValue([ev for res in results.values() for ev in res])
- @cachedInlineCallbacks(num_args=3, max_entries=5000)
+ @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.
@@ -229,10 +249,14 @@ class ReceiptsStore(SQLBaseStore):
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
txn.call_after(
+ self._invalidate_get_users_with_receipts_in_room,
+ room_id, receipt_type, user_id,
+ )
+ txn.call_after(
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+ txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(
self._receipts_stream_cache.entity_has_changed,
@@ -244,6 +268,17 @@ class ReceiptsStore(SQLBaseStore):
(user_id, room_id, receipt_type)
)
+ res = self._simple_select_one_txn(
+ txn,
+ table="events",
+ retcols=["topological_ordering", "stream_ordering"],
+ keyvalues={"event_id": event_id},
+ allow_none=True
+ )
+
+ topological_ordering = int(res["topological_ordering"]) if res else None
+ stream_ordering = int(res["stream_ordering"]) if res else None
+
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
sql = (
@@ -255,16 +290,7 @@ class ReceiptsStore(SQLBaseStore):
txn.execute(sql, (room_id, receipt_type, user_id))
results = txn.fetchall()
- if results:
- res = self._simple_select_one_txn(
- txn,
- table="events",
- retcols=["topological_ordering", "stream_ordering"],
- keyvalues={"event_id": event_id},
- )
- topological_ordering = int(res["topological_ordering"])
- stream_ordering = int(res["stream_ordering"])
-
+ if results and topological_ordering:
for to, so, _ in results:
if int(to) > topological_ordering:
return False
@@ -294,6 +320,14 @@ class ReceiptsStore(SQLBaseStore):
}
)
+ if receipt_type == "m.read" and topological_ordering:
+ self._remove_old_push_actions_before_txn(
+ txn,
+ room_id=room_id,
+ user_id=user_id,
+ topological_ordering=topological_ordering,
+ )
+
return True
@defer.inlineCallbacks
@@ -364,10 +398,14 @@ class ReceiptsStore(SQLBaseStore):
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
txn.call_after(
+ self._invalidate_get_users_with_receipts_in_room,
+ room_id, receipt_type, user_id,
+ )
+ txn.call_after(
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+ txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
self._simple_delete_txn(
txn,
@@ -391,6 +429,9 @@ class ReceiptsStore(SQLBaseStore):
)
def get_all_updated_receipts(self, last_id, current_id, limit=None):
+ if last_id == current_id:
+ return defer.succeed([])
+
def get_all_updated_receipts_txn(txn):
sql = (
"SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7af0cae6a5..bda84a744a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore):
make_guest,
appservice_id
)
+ self.get_user_by_id.invalidate((user_id,))
self.is_guest.invalidate((user_id,))
def _register(
@@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore):
(next_id, user_id, token,)
)
+ @cached()
def get_user_by_id(self, user_id):
return self._simple_select_one(
table="users",
@@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore):
}, {
'password_hash': password_hash
})
+ self.get_user_by_id.invalidate((user_id,))
@defer.inlineCallbacks
def user_delete_access_tokens(self, user_id, except_token_ids=[]):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 70aa64fb31..26933e593a 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -23,6 +23,7 @@ from .engines import PostgresEngine, Sqlite3Engine
import collections
import logging
+import ujson as json
logger = logging.getLogger(__name__)
@@ -221,3 +222,20 @@ class RoomStore(SQLBaseStore):
aliases.extend(e.content['aliases'])
defer.returnValue((name, aliases))
+
+ def add_event_report(self, room_id, event_id, user_id, reason, content,
+ received_ts):
+ next_id = self._event_reports_id_gen.get_next()
+ return self._simple_insert(
+ table="event_reports",
+ values={
+ "id": next_id,
+ "received_ts": received_ts,
+ "room_id": room_id,
+ "event_id": event_id,
+ "user_id": user_id,
+ "reason": reason,
+ "content": json.dumps(content),
+ },
+ desc="add_event_report"
+ )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 08a54cbdd1..64b4bd371b 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -21,7 +21,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.api.constants import Membership
-from synapse.types import UserID
+from synapse.types import get_domain_from_id
import logging
@@ -59,9 +59,6 @@ class RoomMemberStore(SQLBaseStore):
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(
- self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
- )
- txn.call_after(
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
)
@@ -137,24 +134,6 @@ class RoomMemberStore(SQLBaseStore):
return [r["user_id"] for r in rows]
return self.runInteraction("get_users_in_room", f)
- def get_room_members(self, room_id, membership=None):
- """Retrieve the current room member list for a room.
-
- Args:
- room_id (str): The room to get the list of members.
- membership (synapse.api.constants.Membership): The filter to apply
- to this list, or None to return all members with some state
- associated with this room.
- Returns:
- list of namedtuples representing the members in this room.
- """
- return self.runInteraction(
- "get_room_members",
- self._get_members_events_txn,
- room_id,
- membership=membership,
- ).addCallback(self._get_events)
-
@cached()
def get_invited_rooms_for_user(self, user_id):
""" Get all the rooms the user is invited to
@@ -259,26 +238,10 @@ class RoomMemberStore(SQLBaseStore):
return results
- @cached(max_entries=5000)
+ @cachedInlineCallbacks(max_entries=5000)
def get_joined_hosts_for_room(self, room_id):
- return self.runInteraction(
- "get_joined_hosts_for_room",
- self._get_joined_hosts_for_room_txn,
- room_id,
- )
-
- def _get_joined_hosts_for_room_txn(self, txn, room_id):
- rows = self._get_members_rows_txn(
- txn,
- room_id, membership=Membership.JOIN
- )
-
- joined_domains = set(
- UserID.from_string(r["user_id"]).domain
- for r in rows
- )
-
- return joined_domains
+ user_ids = yield self.get_users_in_room(room_id)
+ defer.returnValue(set(get_domain_from_id(uid) for uid in user_ids))
def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
rows = self._get_members_rows_txn(
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index b417e3ac08..5b7d8d1ab5 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from synapse.storage.appservice import ApplicationServiceStore
+from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__)
@@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
logger.warning("Could not get app_service_config_files from config")
pass
- appservices = ApplicationServiceStore.load_appservices(
+ appservices = load_appservices(
config.server_name, config_files
)
diff --git a/synapse/storage/schema/delta/32/events.sql b/synapse/storage/schema/delta/32/events.sql
new file mode 100644
index 0000000000..1dd0f9e170
--- /dev/null
+++ b/synapse/storage/schema/delta/32/events.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN received_ts BIGINT;
diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql
new file mode 100644
index 0000000000..36f37b11c8
--- /dev/null
+++ b/synapse/storage/schema/delta/32/openid.sql
@@ -0,0 +1,9 @@
+
+CREATE TABLE open_id_tokens (
+ token TEXT NOT NULL PRIMARY KEY,
+ ts_valid_until_ms bigint NOT NULL,
+ user_id TEXT NOT NULL,
+ UNIQUE (token)
+);
+
+CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms);
diff --git a/synapse/storage/schema/delta/32/pusher_throttle.sql b/synapse/storage/schema/delta/32/pusher_throttle.sql
new file mode 100644
index 0000000000..d86d30c13c
--- /dev/null
+++ b/synapse/storage/schema/delta/32/pusher_throttle.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE pusher_throttle(
+ pusher BIGINT NOT NULL,
+ room_id TEXT NOT NULL,
+ last_sent_ts BIGINT,
+ throttle_ms BIGINT,
+ PRIMARY KEY (pusher, room_id)
+);
diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql
new file mode 100644
index 0000000000..f859be46a6
--- /dev/null
+++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- The following indices are redundant, other indices are equivalent or
+-- supersets
+DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
+DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT
+
+DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT
+
+-- The following indices were unused
+DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id;
+DROP INDEX IF EXISTS evauth_edges_auth_id;
+DROP INDEX IF EXISTS presence_stream_state;
diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql
new file mode 100644
index 0000000000..d13609776f
--- /dev/null
+++ b/synapse/storage/schema/delta/32/reports.sql
@@ -0,0 +1,25 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE event_reports(
+ id BIGINT NOT NULL PRIMARY KEY,
+ received_ts BIGINT NOT NULL,
+ room_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ reason TEXT,
+ content TEXT
+);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index b10f2a5787..ea6823f18d 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -19,17 +19,24 @@ from ._base import SQLBaseStore
from unpaddedbase64 import encode_base64
from synapse.crypto.event_signing import compute_event_reference_hash
+from synapse.util.caches.descriptors import cached, cachedList
class SignatureStore(SQLBaseStore):
"""Persistence for event signatures and hashes"""
+ @cached(lru=True)
+ def get_event_reference_hash(self, event_id):
+ return self._get_event_reference_hashes_txn(event_id)
+
+ @cachedList(cached_method_name="get_event_reference_hash",
+ list_name="event_ids", num_args=1)
def get_event_reference_hashes(self, event_ids):
def f(txn):
- return [
- self._get_event_reference_hashes_txn(txn, ev)
- for ev in event_ids
- ]
+ return {
+ event_id: self._get_event_reference_hashes_txn(txn, event_id)
+ for event_id in event_ids
+ }
return self.runInteraction(
"get_event_reference_hashes",
@@ -41,15 +48,15 @@ class SignatureStore(SQLBaseStore):
hashes = yield self.get_event_reference_hashes(
event_ids
)
- hashes = [
- {
+ hashes = {
+ e_id: {
k: encode_base64(v) for k, v in h.items()
if k == "sha256"
}
- for h in hashes
- ]
+ for e_id, h in hashes.items()
+ }
- defer.returnValue(zip(event_ids, hashes))
+ defer.returnValue(hashes.items())
def _get_event_reference_hashes_txn(self, txn, event_id):
"""Get all the hashes for a given PDU.
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index d338dfcf0a..6c7481a728 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -16,16 +16,56 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
+from twisted.internet import defer, reactor
+
from canonicaljson import encode_canonical_json
+
+from collections import namedtuple
+
+import itertools
import logging
logger = logging.getLogger(__name__)
+_TransactionRow = namedtuple(
+ "_TransactionRow", (
+ "id", "transaction_id", "destination", "ts", "response_code",
+ "response_json",
+ )
+)
+
+_UpdateTransactionRow = namedtuple(
+ "_TransactionRow", (
+ "response_code", "response_json",
+ )
+)
+
+
class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
+ def __init__(self, hs):
+ super(TransactionStore, self).__init__(hs)
+
+ # New transactions that are currently in flights
+ self.inflight_transactions = {}
+
+ # Newly delievered transactions that *weren't* persisted while in flight
+ self.new_delivered_transactions = {}
+
+ # Newly delivered transactions that *were* persisted while in flight
+ self.update_delivered_transactions = {}
+
+ self.last_transaction = {}
+
+ reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
+ hs.get_clock().looping_call(
+ self._persist_in_mem_txns,
+ 1000,
+ )
+
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore):
list: A list of previous transaction ids.
"""
- return self.runInteraction(
- "prep_send_transaction",
- self._prep_send_transaction,
- transaction_id, destination, origin_server_ts
+ auto_id = self._transaction_id_gen.get_next()
+
+ txn_row = _TransactionRow(
+ id=auto_id,
+ transaction_id=transaction_id,
+ destination=destination,
+ ts=origin_server_ts,
+ response_code=0,
+ response_json=None,
)
- def _prep_send_transaction(self, txn, transaction_id, destination,
- origin_server_ts):
+ self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row
- next_id = self._transaction_id_gen.get_next()
+ prev_txn = self.last_transaction.get(destination)
+ if prev_txn:
+ return defer.succeed(prev_txn)
+ else:
+ return self.runInteraction(
+ "_get_prevs_txn",
+ self._get_prevs_txn,
+ destination,
+ )
+ def _get_prevs_txn(self, txn, destination):
# First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
@@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore):
prev_txns = [r["transaction_id"] for r in results]
- # Actually add the new transaction to the sent_transactions table.
-
- self._simple_insert_txn(
- txn,
- table="sent_transactions",
- values={
- "id": next_id,
- "transaction_id": transaction_id,
- "destination": destination,
- "ts": origin_server_ts,
- "response_code": 0,
- "response_json": None,
- }
- )
-
- # TODO Update the tx id -> pdu id mapping
-
return prev_txns
def delivered_txn(self, transaction_id, destination, code, response_dict):
@@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore):
code (int)
response_json (str)
"""
- return self.runInteraction(
- "delivered_txn",
- self._delivered_txn,
- transaction_id, destination, code,
- buffer(encode_canonical_json(response_dict)),
- )
- def _delivered_txn(self, txn, transaction_id, destination,
- code, response_json):
- self._simple_update_one_txn(
- txn,
- table="sent_transactions",
- keyvalues={
- "transaction_id": transaction_id,
- "destination": destination,
- },
- updatevalues={
- "response_code": code,
- "response_json": None, # For now, don't persist response_json
- }
- )
+ txn_row = self.inflight_transactions.get(
+ destination, {}
+ ).pop(transaction_id, None)
+
+ self.last_transaction[destination] = transaction_id
+
+ if txn_row:
+ d = self.new_delivered_transactions.setdefault(destination, {})
+ d[transaction_id] = txn_row._replace(
+ response_code=code,
+ response_json=None, # For now, don't persist response
+ )
+ else:
+ d = self.update_delivered_transactions.setdefault(destination, {})
+ # For now, don't persist response
+ d[transaction_id] = _UpdateTransactionRow(code, None)
def get_transactions_after(self, transaction_id, destination):
"""Get all transactions after a given local transaction_id.
@@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore):
txn.execute(query, (self._clock.time_msec(),))
return self.cursor_to_dict(txn)
+
+ @defer.inlineCallbacks
+ def _persist_in_mem_txns(self):
+ try:
+ inflight = self.inflight_transactions
+ new_delivered = self.new_delivered_transactions
+ update_delivered = self.update_delivered_transactions
+
+ self.inflight_transactions = {}
+ self.new_delivered_transactions = {}
+ self.update_delivered_transactions = {}
+
+ full_rows = [
+ row._asdict()
+ for txn_map in itertools.chain(inflight.values(), new_delivered.values())
+ for row in txn_map.values()
+ ]
+
+ def f(txn):
+ if full_rows:
+ self._simple_insert_many_txn(
+ txn=txn,
+ table="sent_transactions",
+ values=full_rows
+ )
+
+ for dest, txn_map in update_delivered.items():
+ for txn_id, update_row in txn_map.items():
+ self._simple_update_one_txn(
+ txn,
+ table="sent_transactions",
+ keyvalues={
+ "transaction_id": txn_id,
+ "destination": dest,
+ },
+ updatevalues={
+ "response_code": update_row.response_code,
+ "response_json": None, # For now, don't persist response
+ }
+ )
+
+ if full_rows or update_delivered:
+ yield self.runInteraction("_persist_in_mem_txns", f)
+ except:
+ logger.exception("Failed to persist transactions!")
|