diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4beb951b9f..02b1f06854 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -29,10 +29,14 @@ from .stream import StreamStore
from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
+from .pusher import PusherStore
+from .push_rule import PushRuleStore
from .media_repository import MediaRepositoryStore
+from .rejections import RejectionsStore
from .state import StateStore
from .signatures import SignatureStore
+from .filtering import FilteringStore
from syutil.base64util import decode_base64
from syutil.jsonutil import encode_canonical_json
@@ -40,7 +44,6 @@ from syutil.jsonutil import encode_canonical_json
from synapse.crypto.event_signing import compute_event_reference_hash
-import json
import logging
import os
@@ -60,13 +63,16 @@ SCHEMAS = [
"state",
"event_edges",
"event_signatures",
+ "pusher",
"media_repository",
+ "filtering",
+ "rejections",
]
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 11
+SCHEMA_VERSION = 12
class _RollbackButIsFineException(Exception):
@@ -82,6 +88,10 @@ class DataStore(RoomMemberStore, RoomStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
EventFederationStore,
MediaRepositoryStore,
+ RejectionsStore,
+ FilteringStore,
+ PusherStore,
+ PushRuleStore
):
def __init__(self, hs):
@@ -117,21 +127,147 @@ class DataStore(RoomMemberStore, RoomStore,
pass
@defer.inlineCallbacks
- def get_event(self, event_id, allow_none=False):
- events = yield self._get_events([event_id])
+ def get_event(self, event_id, check_redacted=True,
+ get_prev_content=False, allow_rejected=False,
+ allow_none=False):
+ """Get an event from the database by event_id.
+
+ Args:
+ event_id (str): The event_id of the event to fetch
+ check_redacted (bool): If True, check if event has been redacted
+ and redact it.
+ get_prev_content (bool): If True and event is a state event,
+ include the previous states content in the unsigned field.
+ allow_rejected (bool): If True return rejected events.
+ allow_none (bool): If True, return None if no event found, if
+ False throw an exception.
+
+ Returns:
+ Deferred : A FrozenEvent.
+ """
+ event = yield self.runInteraction(
+ "get_event", self._get_event_txn,
+ event_id,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
- if not events:
- if allow_none:
- defer.returnValue(None)
- else:
- raise RuntimeError("Could not find event %s" % (event_id,))
+ if not event and not allow_none:
+ raise RuntimeError("Could not find event %s" % (event_id,))
- defer.returnValue(events[0])
+ defer.returnValue(event)
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
+
+ # Remove the any existing cache entries for the event_id
+ self._get_event_cache.pop(event.event_id)
+
+ # We purposefully do this first since if we include a `current_state`
+ # key, we *want* to update the `current_state_events` table
+ if current_state:
+ txn.execute(
+ "DELETE FROM current_state_events WHERE room_id = ?",
+ (event.room_id,)
+ )
+
+ for s in current_state:
+ self._simple_insert_txn(
+ txn,
+ "current_state_events",
+ {
+ "event_id": s.event_id,
+ "room_id": s.room_id,
+ "type": s.type,
+ "state_key": s.state_key,
+ },
+ or_replace=True,
+ )
+
+ if event.is_state() and is_new_state:
+ if not backfilled and not context.rejected:
+ self._simple_insert_txn(
+ txn,
+ table="state_forward_extremities",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ or_replace=True,
+ )
+
+ for prev_state_id, _ in event.prev_state:
+ self._simple_delete_txn(
+ txn,
+ table="state_forward_extremities",
+ keyvalues={
+ "event_id": prev_state_id,
+ }
+ )
+
+ outlier = event.internal_metadata.is_outlier()
+
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
+
+ self._update_min_depth_for_room_txn(
+ txn,
+ event.room_id,
+ event.depth
+ )
+
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
+ have_persisted = self._simple_select_one_onecol_txn(
+ txn,
+ table="event_json",
+ keyvalues={"event_id": event.event_id},
+ retcol="event_id",
+ allow_none=True,
+ )
+
+ metadata_json = encode_canonical_json(
+ event.internal_metadata.get_dict()
+ )
+
+ # If we have already persisted this event, we don't need to do any
+ # more processing.
+ # The processing above must be done on every call to persist event,
+ # since they might not have happened on previous calls. For example,
+ # if we are persisting an event that we had persisted as an outlier,
+ # but is no longer one.
+ if have_persisted:
+ if not outlier:
+ sql = (
+ "UPDATE event_json SET internal_metadata = ?"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (metadata_json.decode("UTF-8"), event.event_id,)
+ )
+
+ sql = (
+ "UPDATE events SET outlier = 0"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (event.event_id,)
+ )
+ return
+
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Feedback:
@@ -143,8 +279,6 @@ class DataStore(RoomMemberStore, RoomStore,
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
- outlier = event.internal_metadata.is_outlier()
-
event_dict = {
k: v
for k, v in event.get_dict().items()
@@ -154,10 +288,6 @@ class DataStore(RoomMemberStore, RoomStore,
]
}
- metadata_json = encode_canonical_json(
- event.internal_metadata.get_dict()
- )
-
self._simple_insert_txn(
txn,
table="event_json",
@@ -170,12 +300,16 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
+ content = encode_canonical_json(
+ event.content
+ ).decode("UTF-8")
+
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
- "content": json.dumps(event.get_dict()["content"]),
+ "content": content,
"processed": True,
"outlier": outlier,
"depth": event.depth,
@@ -195,7 +329,10 @@ class DataStore(RoomMemberStore, RoomStore,
"prev_events",
]
}
- vals["unrecognized_keys"] = json.dumps(unrec)
+
+ vals["unrecognized_keys"] = encode_canonical_json(
+ unrec
+ ).decode("UTF-8")
try:
self._simple_insert_txn(
@@ -213,38 +350,10 @@ class DataStore(RoomMemberStore, RoomStore,
)
raise _RollbackButIsFineException("_persist_event")
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
- )
-
- if not outlier:
- self._store_state_groups_txn(txn, event, context)
-
- if current_state:
- txn.execute(
- "DELETE FROM current_state_events WHERE room_id = ?",
- (event.room_id,)
- )
+ if context.rejected:
+ self._store_rejections_txn(txn, event.event_id, context.rejected)
- for s in current_state:
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": s.event_id,
- "room_id": s.room_id,
- "type": s.type,
- "state_key": s.state_key,
- },
- or_replace=True,
- )
-
- is_state = hasattr(event, "state_key") and event.state_key is not None
- if is_state:
+ if event.is_state():
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -252,6 +361,7 @@ class DataStore(RoomMemberStore, RoomStore,
"state_key": event.state_key,
}
+ # TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
@@ -262,7 +372,7 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
- if is_new_state:
+ if is_new_state and not context.rejected:
self._simple_insert_txn(
txn,
"current_state_events",
@@ -288,28 +398,6 @@ class DataStore(RoomMemberStore, RoomStore,
or_ignore=True,
)
- if not backfilled:
- self._simple_insert_txn(
- txn,
- table="state_forward_extremities",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- or_replace=True,
- )
-
- for prev_state_id, _ in event.prev_state:
- self._simple_delete_txn(
- txn,
- table="state_forward_extremities",
- keyvalues={
- "event_id": prev_state_id,
- }
- )
-
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
@@ -340,14 +428,9 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, ref_alg, ref_hash_bytes
)
- if not outlier:
- self._update_min_depth_for_room_txn(
- txn,
- event.room_id,
- event.depth
- )
-
def _store_redaction(self, txn, event):
+ # invalidate the cache for the redacted event
+ self._get_event_cache.pop(event.redacts)
txn.execute(
"INSERT OR IGNORE INTO redactions "
"(event_id, redacts) VALUES (?,?)",
@@ -370,9 +453,12 @@ class DataStore(RoomMemberStore, RoomStore,
"redacted": del_sql,
}
- if event_type:
+ if event_type and state_key is not None:
sql += " AND s.type = ? AND s.state_key = ? "
args = (room_id, event_type, state_key)
+ elif event_type:
+ sql += " AND s.type = ?"
+ args = (room_id, event_type)
else:
args = (room_id, )
@@ -382,6 +468,41 @@ class DataStore(RoomMemberStore, RoomStore,
defer.returnValue(events)
@defer.inlineCallbacks
+ def get_room_name_and_aliases(self, room_id):
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+ "LIMIT 1"
+ )
+
+ sql = (
+ "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+ "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+ "INNER JOIN state_events as s ON e.event_id = s.event_id "
+ "WHERE c.room_id = ? "
+ ) % {
+ "redacted": del_sql,
+ }
+
+ sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+ sql += " OR s.type = 'm.room.aliases')"
+ args = (room_id,)
+
+ results = yield self._execute_and_decode(sql, *args)
+
+ events = yield self._parse_events(results)
+
+ name = None
+ aliases = []
+
+ for e in events:
+ if e.type == 'm.room.name':
+ name = e.content['name']
+ elif e.type == 'm.room.aliases':
+ aliases.extend(e.content['aliases'])
+
+ defer.returnValue((name, aliases))
+
+ @defer.inlineCallbacks
def _get_min_token(self):
row = yield self._execute(
None,
@@ -417,6 +538,38 @@ class DataStore(RoomMemberStore, RoomStore,
],
)
+ def have_events(self, event_ids):
+ """Given a list of event ids, check if we have already processed them.
+
+ Returns:
+ dict: Has an entry for each event id we already have seen. Maps to
+ the rejected reason string if we rejected the event, else maps to
+ None.
+ """
+ if not event_ids:
+ return defer.succeed({})
+
+ def f(txn):
+ sql = (
+ "SELECT e.event_id, reason FROM events as e "
+ "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+ "WHERE e.event_id = ?"
+ )
+
+ res = {}
+ for event_id in event_ids:
+ txn.execute(sql, (event_id,))
+ row = txn.fetchone()
+ if row:
+ _, rejected = row
+ res[event_id] = rejected
+
+ return res
+
+ return self.runInteraction(
+ "have_events", f,
+ )
+
def schema_path(schema):
""" Get a filesystem path for the named database schema
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index f660fc6eaf..be9934c66f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,11 +19,12 @@ from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
+from synapse.util.lrucache import LruCache
from twisted.internet import defer
import collections
-import json
+import simplejson as json
import sys
import time
@@ -77,6 +78,43 @@ class LoggingTransaction(object):
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
+class PerformanceCounters(object):
+ def __init__(self):
+ self.current_counters = {}
+ self.previous_counters = {}
+
+ def update(self, key, start_time, end_time=None):
+ if end_time is None:
+ end_time = time.time() * 1000
+ duration = end_time - start_time
+ count, cum_time = self.current_counters.get(key, (0, 0))
+ count += 1
+ cum_time += duration
+ self.current_counters[key] = (count, cum_time)
+ return end_time
+
+ def interval(self, interval_duration, limit=3):
+ counters = []
+ for name, (count, cum_time) in self.current_counters.items():
+ prev_count, prev_time = self.previous_counters.get(name, (0, 0))
+ counters.append((
+ (cum_time - prev_time) / interval_duration,
+ count - prev_count,
+ name
+ ))
+
+ self.previous_counters = dict(self.current_counters)
+
+ counters.sort(reverse=True)
+
+ top_n_counters = ", ".join(
+ "%s(%d): %.3f%%" % (name, count, 100 * ratio)
+ for ratio, count, name in counters[:limit]
+ )
+
+ return top_n_counters
+
+
class SQLBaseStore(object):
_TXN_ID = 0
@@ -85,6 +123,43 @@ class SQLBaseStore(object):
self._db_pool = hs.get_db_pool()
self._clock = hs.get_clock()
+ self._previous_txn_total_time = 0
+ self._current_txn_total_time = 0
+ self._previous_loop_ts = 0
+ self._txn_perf_counters = PerformanceCounters()
+ self._get_event_counters = PerformanceCounters()
+
+ self._get_event_cache = LruCache(hs.config.event_cache_size)
+
+ def start_profiling(self):
+ self._previous_loop_ts = self._clock.time_msec()
+
+ def loop():
+ curr = self._current_txn_total_time
+ prev = self._previous_txn_total_time
+ self._previous_txn_total_time = curr
+
+ time_now = self._clock.time_msec()
+ time_then = self._previous_loop_ts
+ self._previous_loop_ts = time_now
+
+ ratio = (curr - prev)/(time_now - time_then)
+
+ top_three_counters = self._txn_perf_counters.interval(
+ time_now - time_then, limit=3
+ )
+
+ top_3_event_counters = self._get_event_counters.interval(
+ time_now - time_then, limit=3
+ )
+
+ logger.info(
+ "Total database time: %.3f%% {%s} {%s}",
+ ratio * 100, top_three_counters, top_3_event_counters
+ )
+
+ self._clock.looping_call(loop, 10000)
+
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
"""Wraps the .runInteraction() method on the underlying db_pool."""
@@ -94,7 +169,7 @@ class SQLBaseStore(object):
with LoggingContext("runInteraction") as context:
current_context.copy_to(context)
start = time.time() * 1000
- txn_id = SQLBaseStore._TXN_ID
+ txn_id = self._TXN_ID
# We don't really need these to be unique, so lets stop it from
# growing really large.
@@ -114,6 +189,10 @@ class SQLBaseStore(object):
"[TXN END] {%s} %f",
name, end - start
)
+
+ self._current_txn_total_time += end - start
+ self._txn_perf_counters.update(desc, start, end)
+
with PreserveLoggingContext():
result = yield self._db_pool.runInteraction(
inner_func, *args, **kwargs
@@ -193,6 +272,50 @@ class SQLBaseStore(object):
txn.execute(sql, values.values())
return txn.lastrowid
+ def _simple_upsert(self, table, keyvalues, values):
+ """
+ Args:
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ Returns: A deferred
+ """
+ return self.runInteraction(
+ "_simple_upsert",
+ self._simple_upsert_txn, table, keyvalues, values
+ )
+
+ def _simple_upsert_txn(self, txn, table, keyvalues, values):
+ # Try to update
+ sql = "UPDATE %s SET %s WHERE %s" % (
+ table,
+ ", ".join("%s = ?" % (k,) for k in values),
+ " AND ".join("%s = ?" % (k,) for k in keyvalues)
+ )
+ sqlargs = values.values() + keyvalues.values()
+ logger.debug(
+ "[SQL] %s Args=%s",
+ sql, sqlargs,
+ )
+
+ txn.execute(sql, sqlargs)
+ if txn.rowcount == 0:
+ # We didn't update and rows so insert a new one
+ allvalues = {}
+ allvalues.update(keyvalues)
+ allvalues.update(values)
+
+ sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+ table,
+ ", ".join(k for k in allvalues),
+ ", ".join("?" for _ in allvalues)
+ )
+ logger.debug(
+ "[SQL] %s Args=%s",
+ sql, keyvalues.values(),
+ )
+ txn.execute(sql, allvalues.values())
+
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False):
"""Executes a SELECT query on the named table, which is expected to
@@ -344,8 +467,8 @@ class SQLBaseStore(object):
if updatevalues:
update_sql = "UPDATE %s SET %s WHERE %s" % (
table,
- ", ".join("%s = ?" % (k) for k in updatevalues),
- " AND ".join("%s = ?" % (k) for k in keyvalues)
+ ", ".join("%s = ?" % (k,) for k in updatevalues),
+ " AND ".join("%s = ?" % (k,) for k in keyvalues)
)
def func(txn):
@@ -458,10 +581,25 @@ class SQLBaseStore(object):
return [e for e in events if e]
def _get_event_txn(self, txn, event_id, check_redacted=True,
- get_prev_content=False):
+ get_prev_content=False, allow_rejected=False):
+
+ start_time = time.time() * 1000
+ update_counter = self._get_event_counters.update
+
+ try:
+ cache = self._get_event_cache.setdefault(event_id, {})
+ # Separate cache entries for each way to invoke _get_event_txn
+ return cache[(check_redacted, get_prev_content, allow_rejected)]
+ except KeyError:
+ pass
+ finally:
+ start_time = update_counter("event_cache", start_time)
+
sql = (
- "SELECT internal_metadata, json, r.event_id FROM event_json as e "
+ "SELECT e.internal_metadata, e.json, r.event_id, rej.reason "
+ "FROM event_json as e "
"LEFT JOIN redactions as r ON e.event_id = r.redacts "
+ "LEFT JOIN rejections as rej on rej.event_id = e.event_id "
"WHERE e.event_id = ? "
"LIMIT 1 "
)
@@ -473,20 +611,35 @@ class SQLBaseStore(object):
if not res:
return None
- internal_metadata, js, redacted = res
+ internal_metadata, js, redacted, rejected_reason = res
- return self._get_event_from_row_txn(
- txn, internal_metadata, js, redacted,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- )
+ start_time = update_counter("select_event", start_time)
+
+ if allow_rejected or not rejected_reason:
+ result = self._get_event_from_row_txn(
+ txn, internal_metadata, js, redacted,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ )
+ cache[(check_redacted, get_prev_content, allow_rejected)] = result
+ return result
+ else:
+ return None
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False):
+
+ start_time = time.time() * 1000
+ update_counter = self._get_event_counters.update
+
d = json.loads(js)
+ start_time = update_counter("decode_json", start_time)
+
internal_metadata = json.loads(internal_metadata)
+ start_time = update_counter("decode_internal", start_time)
ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
+ start_time = update_counter("build_frozen_event", start_time)
if check_redacted and redacted:
ev = prune_event(ev)
@@ -502,6 +655,7 @@ class SQLBaseStore(object):
if because:
ev.unsigned["redacted_because"] = because
+ start_time = update_counter("redact_event", start_time)
if get_prev_content and "replaces_state" in ev.unsigned:
prev = self._get_event_txn(
@@ -511,6 +665,7 @@ class SQLBaseStore(object):
)
if prev:
ev.unsigned["prev_content"] = prev.get_dict()["content"]
+ start_time = update_counter("get_prev_content", start_time)
return ev
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
new file mode 100644
index 0000000000..457a11fd02
--- /dev/null
+++ b/synapse/storage/filtering.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import SQLBaseStore
+
+import simplejson as json
+
+
+class FilteringStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def get_user_filter(self, user_localpart, filter_id):
+ def_json = yield self._simple_select_one_onecol(
+ table="user_filters",
+ keyvalues={
+ "user_id": user_localpart,
+ "filter_id": filter_id,
+ },
+ retcol="filter_json",
+ allow_none=False,
+ )
+
+ defer.returnValue(json.loads(def_json))
+
+ def add_user_filter(self, user_localpart, user_filter):
+ def_json = json.dumps(user_filter)
+
+ # Need an atomic transaction to SELECT the maximal ID so far then
+ # INSERT a new one
+ def _do_txn(txn):
+ sql = (
+ "SELECT MAX(filter_id) FROM user_filters "
+ "WHERE user_id = ?"
+ )
+ txn.execute(sql, (user_localpart,))
+ max_id = txn.fetchone()[0]
+ if max_id is None:
+ filter_id = 0
+ else:
+ filter_id = max_id + 1
+
+ sql = (
+ "INSERT INTO user_filters (user_id, filter_id, filter_json)"
+ "VALUES(?, ?, ?)"
+ )
+ txn.execute(sql, (user_localpart, filter_id, def_json))
+
+ return filter_id
+
+ return self.runInteraction("add_user_filter", _do_txn)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
new file mode 100644
index 0000000000..ae46b39cc1
--- /dev/null
+++ b/synapse/storage/push_rule.py
@@ -0,0 +1,218 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+import logging
+import copy
+import simplejson as json
+
+logger = logging.getLogger(__name__)
+
+
+class PushRuleStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def get_push_rules_for_user_name(self, user_name):
+ sql = (
+ "SELECT "+",".join(PushRuleTable.fields)+" "
+ "FROM "+PushRuleTable.table_name+" "
+ "WHERE user_name = ? "
+ "ORDER BY priority_class DESC, priority DESC"
+ )
+ rows = yield self._execute(None, sql, user_name)
+
+ dicts = []
+ for r in rows:
+ d = {}
+ for i, f in enumerate(PushRuleTable.fields):
+ d[f] = r[i]
+ dicts.append(d)
+
+ defer.returnValue(dicts)
+
+ @defer.inlineCallbacks
+ def add_push_rule(self, before, after, **kwargs):
+ vals = copy.copy(kwargs)
+ if 'conditions' in vals:
+ vals['conditions'] = json.dumps(vals['conditions'])
+ if 'actions' in vals:
+ vals['actions'] = json.dumps(vals['actions'])
+ # we could check the rest of the keys are valid column names
+ # but sqlite will do that anyway so I think it's just pointless.
+ if 'id' in vals:
+ del vals['id']
+
+ if before or after:
+ ret = yield self.runInteraction(
+ "_add_push_rule_relative_txn",
+ self._add_push_rule_relative_txn,
+ before=before,
+ after=after,
+ **vals
+ )
+ defer.returnValue(ret)
+ else:
+ ret = yield self.runInteraction(
+ "_add_push_rule_highest_priority_txn",
+ self._add_push_rule_highest_priority_txn,
+ **vals
+ )
+ defer.returnValue(ret)
+
+ def _add_push_rule_relative_txn(self, txn, user_name, **kwargs):
+ after = None
+ relative_to_rule = None
+ if 'after' in kwargs and kwargs['after']:
+ after = kwargs['after']
+ relative_to_rule = after
+ if 'before' in kwargs and kwargs['before']:
+ relative_to_rule = kwargs['before']
+
+ # get the priority of the rule we're inserting after/before
+ sql = (
+ "SELECT priority_class, priority FROM ? "
+ "WHERE user_name = ? and rule_id = ?" % (PushRuleTable.table_name,)
+ )
+ txn.execute(sql, (user_name, relative_to_rule))
+ res = txn.fetchall()
+ if not res:
+ raise RuleNotFoundException(
+ "before/after rule not found: %s" % (relative_to_rule,)
+ )
+ priority_class, base_rule_priority = res[0]
+
+ if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
+ raise InconsistentRuleException(
+ "Given priority class does not match class of relative rule"
+ )
+
+ new_rule = copy.copy(kwargs)
+ if 'before' in new_rule:
+ del new_rule['before']
+ if 'after' in new_rule:
+ del new_rule['after']
+ new_rule['priority_class'] = priority_class
+ new_rule['user_name'] = user_name
+
+ # check if the priority before/after is free
+ new_rule_priority = base_rule_priority
+ if after:
+ new_rule_priority -= 1
+ else:
+ new_rule_priority += 1
+
+ new_rule['priority'] = new_rule_priority
+
+ sql = (
+ "SELECT COUNT(*) FROM " + PushRuleTable.table_name +
+ " WHERE user_name = ? AND priority_class = ? AND priority = ?"
+ )
+ txn.execute(sql, (user_name, priority_class, new_rule_priority))
+ res = txn.fetchall()
+ num_conflicting = res[0][0]
+
+ # if there are conflicting rules, bump everything
+ if num_conflicting:
+ sql = "UPDATE "+PushRuleTable.table_name+" SET priority = priority "
+ if after:
+ sql += "-1"
+ else:
+ sql += "+1"
+ sql += " WHERE user_name = ? AND priority_class = ? AND priority "
+ if after:
+ sql += "<= ?"
+ else:
+ sql += ">= ?"
+
+ txn.execute(sql, (user_name, priority_class, new_rule_priority))
+
+ # now insert the new rule
+ sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+ sql += ",".join(new_rule.keys())+") VALUES ("
+ sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+ txn.execute(sql, new_rule.values())
+
+ def _add_push_rule_highest_priority_txn(self, txn, user_name,
+ priority_class, **kwargs):
+ # find the highest priority rule in that class
+ sql = (
+ "SELECT COUNT(*), MAX(priority) FROM " + PushRuleTable.table_name +
+ " WHERE user_name = ? and priority_class = ?"
+ )
+ txn.execute(sql, (user_name, priority_class))
+ res = txn.fetchall()
+ (how_many, highest_prio) = res[0]
+
+ new_prio = 0
+ if how_many > 0:
+ new_prio = highest_prio + 1
+
+ # and insert the new rule
+ new_rule = copy.copy(kwargs)
+ if 'id' in new_rule:
+ del new_rule['id']
+ new_rule['user_name'] = user_name
+ new_rule['priority_class'] = priority_class
+ new_rule['priority'] = new_prio
+
+ sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+ sql += ",".join(new_rule.keys())+") VALUES ("
+ sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+ txn.execute(sql, new_rule.values())
+
+ @defer.inlineCallbacks
+ def delete_push_rule(self, user_name, rule_id):
+ """
+ Delete a push rule. Args specify the row to be deleted and can be
+ any of the columns in the push_rule table, but below are the
+ standard ones
+
+ Args:
+ user_name (str): The matrix ID of the push rule owner
+ rule_id (str): The rule_id of the rule to be deleted
+ """
+ yield self._simple_delete_one(
+ PushRuleTable.table_name,
+ {'user_name': user_name, 'rule_id': rule_id}
+ )
+
+
+class RuleNotFoundException(Exception):
+ pass
+
+
+class InconsistentRuleException(Exception):
+ pass
+
+
+class PushRuleTable(Table):
+ table_name = "push_rules"
+
+ fields = [
+ "id",
+ "user_name",
+ "rule_id",
+ "priority_class",
+ "priority",
+ "conditions",
+ "actions",
+ ]
+
+ EntryType = collections.namedtuple("PushRuleEntry", fields)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
new file mode 100644
index 0000000000..e2a662a6c7
--- /dev/null
+++ b/synapse/storage/pusher.py
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class PusherStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
+ sql = (
+ "SELECT id, user_name, kind, profile_tag, app_id,"
+ "app_display_name, device_display_name, pushkey, ts, data, "
+ "last_token, last_success, failing_since "
+ "FROM pushers "
+ "WHERE app_id = ? AND pushkey = ?"
+ )
+
+ rows = yield self._execute(
+ None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1]
+ )
+
+ ret = [
+ {
+ "id": r[0],
+ "user_name": r[1],
+ "kind": r[2],
+ "profile_tag": r[3],
+ "app_id": r[4],
+ "app_display_name": r[5],
+ "device_display_name": r[6],
+ "pushkey": r[7],
+ "pushkey_ts": r[8],
+ "data": r[9],
+ "last_token": r[10],
+ "last_success": r[11],
+ "failing_since": r[12]
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret[0])
+
+ @defer.inlineCallbacks
+ def get_all_pushers(self):
+ sql = (
+ "SELECT id, user_name, kind, profile_tag, app_id,"
+ "app_display_name, device_display_name, pushkey, ts, data, "
+ "last_token, last_success, failing_since "
+ "FROM pushers"
+ )
+
+ rows = yield self._execute(None, sql)
+
+ ret = [
+ {
+ "id": r[0],
+ "user_name": r[1],
+ "kind": r[2],
+ "profile_tag": r[3],
+ "app_id": r[4],
+ "app_display_name": r[5],
+ "device_display_name": r[6],
+ "pushkey": r[7],
+ "pushkey_ts": r[8],
+ "data": r[9],
+ "last_token": r[10],
+ "last_success": r[11],
+ "failing_since": r[12]
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def add_pusher(self, user_name, profile_tag, kind, app_id,
+ app_display_name, device_display_name,
+ pushkey, pushkey_ts, lang, data):
+ try:
+ yield self._simple_upsert(
+ PushersTable.table_name,
+ dict(
+ app_id=app_id,
+ pushkey=pushkey,
+ ),
+ dict(
+ user_name=user_name,
+ kind=kind,
+ profile_tag=profile_tag,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ ts=pushkey_ts,
+ lang=lang,
+ data=data
+ ))
+ except Exception as e:
+ logger.error("create_pusher with failed: %s", e)
+ raise StoreError(500, "Problem creating pusher.")
+
+ @defer.inlineCallbacks
+ def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+ yield self._simple_delete_one(
+ PushersTable.table_name,
+ dict(app_id=app_id, pushkey=pushkey)
+ )
+
+ @defer.inlineCallbacks
+ def update_pusher_last_token(self, user_name, pushkey, last_token):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'user_name': user_name, 'pushkey': pushkey},
+ {'last_token': last_token}
+ )
+
+ @defer.inlineCallbacks
+ def update_pusher_last_token_and_success(self, user_name, pushkey,
+ last_token, last_success):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'user_name': user_name, 'pushkey': pushkey},
+ {'last_token': last_token, 'last_success': last_success}
+ )
+
+ @defer.inlineCallbacks
+ def update_pusher_failing_since(self, user_name, pushkey, failing_since):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'user_name': user_name, 'pushkey': pushkey},
+ {'failing_since': failing_since}
+ )
+
+
+class PushersTable(Table):
+ table_name = "pushers"
+
+ fields = [
+ "id",
+ "user_name",
+ "kind",
+ "profile_tag",
+ "app_id",
+ "app_display_name",
+ "device_display_name",
+ "pushkey",
+ "pushkey_ts",
+ "data",
+ "last_token",
+ "last_success",
+ "failing_since"
+ ]
+
+ EntryType = collections.namedtuple("PusherEntry", fields)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 75dffa4db2..029b07cc66 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -122,7 +122,8 @@ class RegistrationStore(SQLBaseStore):
def _query_for_auth(self, txn, token):
sql = (
- "SELECT users.name, users.admin, access_tokens.device_id"
+ "SELECT users.name, users.admin,"
+ " access_tokens.device_id, access_tokens.id as token_id"
" FROM users"
" INNER JOIN access_tokens on users.id = access_tokens.user_id"
" WHERE token = ?"
diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py
new file mode 100644
index 0000000000..4e1a9a2783
--- /dev/null
+++ b/synapse/storage/rejections.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class RejectionsStore(SQLBaseStore):
+ def _store_rejections_txn(self, txn, event_id, reason):
+ self._simple_insert_txn(
+ txn,
+ table="rejections",
+ values={
+ "event_id": event_id,
+ "reason": reason,
+ "last_check": self._clock.time_msec(),
+ }
+ )
+
+ def get_rejection_reason(self, event_id):
+ return self._simple_select_one_onecol(
+ table="rejections",
+ retcol="reason",
+ keyvalues={
+ "event_id": event_id,
+ },
+ allow_none=True,
+ )
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 978b2c4a48..750b17a45f 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -58,13 +58,6 @@ class RoomStore(SQLBaseStore):
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
- def store_room_config(self, room_id, visibility):
- return self._simple_update_one(
- table=RoomsTable.table_name,
- keyvalues={"room_id": room_id},
- updatevalues={"is_public": visibility}
- )
-
def get_room(self, room_id):
"""Retrieve a room.
@@ -89,38 +82,45 @@ class RoomStore(SQLBaseStore):
"topic" key if one is set, and a "name" key if one is set
"""
- topic_subquery = (
- "SELECT topics.event_id as event_id, "
- "topics.room_id as room_id, topic "
- "FROM topics "
- "INNER JOIN current_state_events as c "
- "ON c.event_id = topics.event_id "
- )
+ def f(txn):
+ topic_subquery = (
+ "SELECT topics.event_id as event_id, "
+ "topics.room_id as room_id, topic "
+ "FROM topics "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = topics.event_id "
+ )
- name_subquery = (
- "SELECT room_names.event_id as event_id, "
- "room_names.room_id as room_id, name "
- "FROM room_names "
- "INNER JOIN current_state_events as c "
- "ON c.event_id = room_names.event_id "
- )
+ name_subquery = (
+ "SELECT room_names.event_id as event_id, "
+ "room_names.room_id as room_id, name "
+ "FROM room_names "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = room_names.event_id "
+ )
- # We use non printing ascii character US () as a seperator
- sql = (
- "SELECT r.room_id, n.name, t.topic, "
- "group_concat(a.room_alias, '') "
- "FROM rooms AS r "
- "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
- "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
- "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
- "WHERE r.is_public = ? "
- "GROUP BY r.room_id "
- ) % {
- "topic": topic_subquery,
- "name": name_subquery,
- }
-
- rows = yield self._execute(None, sql, is_public)
+ # We use non printing ascii character US () as a seperator
+ sql = (
+ "SELECT r.room_id, n.name, t.topic, "
+ "group_concat(a.room_alias, '') "
+ "FROM rooms AS r "
+ "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
+ "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
+ "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
+ "WHERE r.is_public = ? "
+ "GROUP BY r.room_id "
+ ) % {
+ "topic": topic_subquery,
+ "name": name_subquery,
+ }
+
+ c = txn.execute(sql, (is_public,))
+
+ return c.fetchall()
+
+ rows = yield self.runInteraction(
+ "get_rooms", f
+ )
ret = [
{
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index e59e65529b..779f9ce544 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -20,6 +20,7 @@ from collections import namedtuple
from ._base import SQLBaseStore
from synapse.api.constants import Membership
+from synapse.types import UserID
import logging
@@ -34,12 +35,17 @@ RoomsForUser = namedtuple(
class RoomMemberStore(SQLBaseStore):
+ def __init__(self, *args, **kw):
+ super(RoomMemberStore, self).__init__(*args, **kw)
+
+ self._user_rooms_cache = {}
+
def _store_room_member_txn(self, txn, event):
"""Store a room member in the database.
"""
try:
target_user_id = event.state_key
- domain = self.hs.parse_userid(target_user_id).domain
+ domain = UserID.from_string(target_user_id).domain
except:
logger.exception(
"Failed to parse target_user_id=%s", target_user_id
@@ -84,7 +90,7 @@ class RoomMemberStore(SQLBaseStore):
for e in member_events:
try:
joined_domains.add(
- self.hs.parse_userid(e.state_key).domain
+ UserID.from_string(e.state_key).domain
)
except:
# FIXME: How do we deal with invalid user ids in the db?
@@ -97,6 +103,8 @@ class RoomMemberStore(SQLBaseStore):
txn.execute(sql, (event.room_id, domain))
+ self.invalidate_rooms_for_user(target_user_id)
+
@defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -239,28 +247,53 @@ class RoomMemberStore(SQLBaseStore):
results = self._parse_events_txn(txn, rows)
return results
+ # TODO(paul): Create a nice @cached decorator to do this
+ # @cached
+ # def get_foo(...)
+ # ...
+ # invalidate_foo = get_foo.invalidator
+
+ @defer.inlineCallbacks
+ def get_rooms_for_user(self, user_id):
+ # TODO(paul): put some performance counters in here so we can easily
+ # track what impact this cache is having
+ if user_id in self._user_rooms_cache:
+ defer.returnValue(self._user_rooms_cache[user_id])
+
+ rooms = yield self.get_rooms_for_user_where_membership_is(
+ user_id, membership_list=[Membership.JOIN],
+ )
+
+ # TODO(paul): Consider applying a maximum size; just evict things at
+ # random, or consider LRU?
+
+ self._user_rooms_cache[user_id] = rooms
+ defer.returnValue(rooms)
+
+ def invalidate_rooms_for_user(self, user_id):
+ if user_id in self._user_rooms_cache:
+ del self._user_rooms_cache[user_id]
+
+ @defer.inlineCallbacks
def user_rooms_intersect(self, user_id_list):
""" Checks whether all the users whose IDs are given in a list share a
room.
+
+ This is a "hot path" function that's called a lot, e.g. by presence for
+ generating the event stream. As such, it is implemented locally by
+ wrapping logic around heavily-cached database queries.
"""
- def interaction(txn):
- user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list))
- sql = (
- "SELECT m.room_id FROM room_memberships as m "
- "INNER JOIN current_state_events as c "
- "ON m.event_id = c.event_id "
- "WHERE m.membership = 'join' "
- "AND (%(clause)s) "
- # TODO(paul): We've got duplicate rows in the database somewhere
- # so we have to DISTINCT m.user_id here
- "GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?"
- ) % {"clause": user_list_clause}
-
- args = list(user_id_list)
- args.append(len(user_id_list))
+ if len(user_id_list) < 2:
+ defer.returnValue(True)
- txn.execute(sql, args)
+ deferreds = [self.get_rooms_for_user(u) for u in user_id_list]
+
+ results = yield defer.DeferredList(deferreds)
+
+ # A list of sets of strings giving room IDs for each user
+ room_id_lists = [set([r.room_id for r in result[1]]) for result in results]
- return len(txn.fetchall()) > 0
+ # There isn't a setintersection(*list_of_sets)
+ ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0
- return self.runInteraction("user_rooms_intersect", interaction)
+ defer.returnValue(ret)
diff --git a/synapse/storage/schema/delta/v12.sql b/synapse/storage/schema/delta/v12.sql
new file mode 100644
index 0000000000..b87ef1fe79
--- /dev/null
+++ b/synapse/storage/schema/delta/v12.sql
@@ -0,0 +1,67 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS rejections(
+ event_id TEXT NOT NULL,
+ reason TEXT NOT NULL,
+ last_check TEXT NOT NULL,
+ CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
+);
+
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ profile_tag varchar(32) NOT NULL,
+ kind varchar(8) NOT NULL,
+ app_id varchar(64) NOT NULL,
+ app_display_name varchar(64) NOT NULL,
+ device_display_name varchar(128) NOT NULL,
+ pushkey blob NOT NULL,
+ ts BIGINT NOT NULL,
+ lang varchar(8),
+ data blob,
+ last_token TEXT,
+ last_success BIGINT,
+ failing_since BIGINT,
+ FOREIGN KEY(user_name) REFERENCES users(name),
+ UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ rule_id TEXT NOT NULL,
+ priority_class TINYINT NOT NULL,
+ priority INTEGER NOT NULL DEFAULT 0,
+ conditions TEXT NOT NULL,
+ actions TEXT NOT NULL,
+ UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
+
+CREATE TABLE IF NOT EXISTS user_filters(
+ user_id TEXT,
+ filter_id INTEGER,
+ filter_json TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id)
+);
+
+CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
+ user_id, filter_id
+);
+
+PRAGMA user_version = 12;
diff --git a/synapse/storage/schema/filtering.sql b/synapse/storage/schema/filtering.sql
new file mode 100644
index 0000000000..beb39ca201
--- /dev/null
+++ b/synapse/storage/schema/filtering.sql
@@ -0,0 +1,24 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS user_filters(
+ user_id TEXT,
+ filter_id INTEGER,
+ filter_json TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id)
+);
+
+CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
+ user_id, filter_id
+);
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
new file mode 100644
index 0000000000..3735b11547
--- /dev/null
+++ b/synapse/storage/schema/pusher.sql
@@ -0,0 +1,46 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ profile_tag varchar(32) NOT NULL,
+ kind varchar(8) NOT NULL,
+ app_id varchar(64) NOT NULL,
+ app_display_name varchar(64) NOT NULL,
+ device_display_name varchar(128) NOT NULL,
+ pushkey blob NOT NULL,
+ ts BIGINT NOT NULL,
+ lang varchar(8),
+ data blob,
+ last_token TEXT,
+ last_success BIGINT,
+ failing_since BIGINT,
+ FOREIGN KEY(user_name) REFERENCES users(name),
+ UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ rule_id TEXT NOT NULL,
+ priority_class TINYINT NOT NULL,
+ priority INTEGER NOT NULL DEFAULT 0,
+ conditions TEXT NOT NULL,
+ actions TEXT NOT NULL,
+ UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
diff --git a/synapse/storage/schema/rejections.sql b/synapse/storage/schema/rejections.sql
new file mode 100644
index 0000000000..bd2a8b1bb5
--- /dev/null
+++ b/synapse/storage/schema/rejections.sql
@@ -0,0 +1,21 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS rejections(
+ event_id TEXT NOT NULL,
+ reason TEXT NOT NULL,
+ last_check TEXT NOT NULL,
+ CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
+);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 5327517704..71db16d0e5 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -78,12 +78,6 @@ class StateStore(SQLBaseStore):
f,
)
- def store_state_groups(self, event):
- return self.runInteraction(
- "store_state_groups",
- self._store_state_groups_txn, event
- )
-
def _store_state_groups_txn(self, txn, event, context):
if context.current_state is None:
return
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index bedc3c6c52..3ccb6f8a61 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -39,6 +39,8 @@ from ._base import SQLBaseStore
from synapse.api.errors import SynapseError
from synapse.util.logutils import log_function
+from collections import namedtuple
+
import logging
@@ -52,91 +54,79 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
-def _parse_stream_token(string):
- try:
- if string[0] != 's':
- raise
- return int(string[1:])
- except:
- raise SynapseError(400, "Invalid token")
-
-
-def _parse_topological_token(string):
- try:
- if string[0] != 't':
- raise
- parts = string[1:].split('-', 1)
- return (int(parts[0]), int(parts[1]))
- except:
- raise SynapseError(400, "Invalid token")
-
-
-def is_stream_token(string):
- try:
- _parse_stream_token(string)
- return True
- except:
- return False
-
-
-def is_topological_token(string):
- try:
- _parse_topological_token(string)
- return True
- except:
- return False
-
-
-def _get_token_bound(token, comparison):
- try:
- s = _parse_stream_token(token)
- return "%s %s %d" % ("stream_ordering", comparison, s)
- except:
- pass
-
- try:
- top, stream = _parse_topological_token(token)
- return "%s %s %d AND %s %s %d" % (
- "topological_ordering", comparison, top,
- "stream_ordering", comparison, stream,
- )
- except:
- pass
-
- raise SynapseError(400, "Invalid token")
-
-
-class StreamStore(SQLBaseStore):
- @log_function
- def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
- direction='f', with_feedback=False):
- # We deal with events request in two different ways depending on if
- # this looks like an /events request or a pagination request.
- is_events = (
- direction == 'f'
- and user_id
- and is_stream_token(from_key)
- and to_key and is_stream_token(to_key)
- )
+class _StreamToken(namedtuple("_StreamToken", "topological stream")):
+ """Tokens are positions between events. The token "s1" comes after event 1.
+
+ s0 s1
+ | |
+ [0] V [1] V [2]
+
+ Tokens can either be a point in the live event stream or a cursor going
+ through historic events.
+
+ When traversing the live event stream events are ordered by when they
+ arrived at the homeserver.
+
+ When traversing historic events the events are ordered by their depth in
+ the event graph "topological_ordering" and then by when they arrived at the
+ homeserver "stream_ordering".
+
+ Live tokens start with an "s" followed by the "stream_ordering" id of the
+ event it comes after. Historic tokens start with a "t" followed by the
+ "topological_ordering" id of the event it comes after, follewed by "-",
+ followed by the "stream_ordering" id of the event it comes after.
+ """
+ __slots__ = []
+
+ @classmethod
+ def parse(cls, string):
+ try:
+ if string[0] == 's':
+ return cls(topological=None, stream=int(string[1:]))
+ if string[0] == 't':
+ parts = string[1:].split('-', 1)
+ return cls(topological=int(parts[0]), stream=int(parts[1]))
+ except:
+ pass
+ raise SynapseError(400, "Invalid token %r" % (string,))
+
+ @classmethod
+ def parse_stream_token(cls, string):
+ try:
+ if string[0] == 's':
+ return cls(topological=None, stream=int(string[1:]))
+ except:
+ pass
+ raise SynapseError(400, "Invalid token %r" % (string,))
+
+ def __str__(self):
+ if self.topological is not None:
+ return "t%d-%d" % (self.topological, self.stream)
+ else:
+ return "s%d" % (self.stream,)
- if is_events:
- return self.get_room_events_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- room_id=room_id,
- limit=limit,
- with_feedback=with_feedback,
+ def lower_bound(self):
+ if self.topological is None:
+ return "(%d < %s)" % (self.stream, "stream_ordering")
+ else:
+ return "(%d < %s OR (%d == %s AND %d < %s))" % (
+ self.topological, "topological_ordering",
+ self.topological, "topological_ordering",
+ self.stream, "stream_ordering",
)
+
+ def upper_bound(self):
+ if self.topological is None:
+ return "(%d >= %s)" % (self.stream, "stream_ordering")
else:
- return self.paginate_room_events(
- from_key=from_key,
- to_key=to_key,
- room_id=room_id,
- limit=limit,
- with_feedback=with_feedback,
+ return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+ self.topological, "topological_ordering",
+ self.topological, "topological_ordering",
+ self.stream, "stream_ordering",
)
+
+class StreamStore(SQLBaseStore):
@log_function
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
limit=0, with_feedback=False):
@@ -162,8 +152,8 @@ class StreamStore(SQLBaseStore):
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
- from_id = _parse_stream_token(from_key)
- to_id = _parse_stream_token(to_key)
+ from_id = _StreamToken.parse_stream_token(from_key)
+ to_id = _StreamToken.parse_stream_token(to_key)
if from_key == to_key:
return defer.succeed(([], to_key))
@@ -181,7 +171,7 @@ class StreamStore(SQLBaseStore):
}
def f(txn):
- txn.execute(sql, (user_id, user_id, from_id, to_id,))
+ txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn)
@@ -191,8 +181,11 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
+ self._set_before_and_after(ret, rows)
+
if rows:
key = "s%d" % max([r["stream_ordering"] for r in rows])
+
else:
# Assume we didn't get anything because there was nothing to
# get.
@@ -211,17 +204,21 @@ class StreamStore(SQLBaseStore):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
- from_comp = '<=' if direction == 'b' else '>'
- to_comp = '>' if direction == 'b' else '<='
- order = "DESC" if direction == 'b' else "ASC"
-
args = [room_id]
-
- bounds = _get_token_bound(from_key, from_comp)
- if to_key:
- bounds = "%s AND %s" % (
- bounds, _get_token_bound(to_key, to_comp)
- )
+ if direction == 'b':
+ order = "DESC"
+ bounds = _StreamToken.parse(from_key).upper_bound()
+ if to_key:
+ bounds = "%s AND %s" % (
+ bounds, _StreamToken.parse(to_key).lower_bound()
+ )
+ else:
+ order = "ASC"
+ bounds = _StreamToken.parse(from_key).lower_bound()
+ if to_key:
+ bounds = "%s AND %s" % (
+ bounds, _StreamToken.parse(to_key).upper_bound()
+ )
if int(limit) > 0:
args.append(int(limit))
@@ -249,9 +246,13 @@ class StreamStore(SQLBaseStore):
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
- topo -= 1
+ # Tokens are positions between events.
+ # This token points *after* the last event in the chunk.
+ # We need it to point to the event before it in the chunk
+ # when we are going backwards so we subtract one from the
+ # stream part.
toke -= 1
- next_token = "t%s-%s" % (topo, toke)
+ next_token = str(_StreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
@@ -262,35 +263,62 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
+ self._set_before_and_after(events, rows)
+
return events, next_token,
return self.runInteraction("paginate_room_events", f)
def get_recent_events_for_room(self, room_id, limit, end_token,
- with_feedback=False):
+ with_feedback=False, from_token=None):
# TODO (erikj): Handle compressed feedback
- sql = (
- "SELECT stream_ordering, topological_ordering, event_id FROM events "
- "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
- "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
- )
+ end_token = _StreamToken.parse_stream_token(end_token)
- def f(txn):
- txn.execute(sql, (room_id, end_token, limit,))
+ if from_token is None:
+ sql = (
+ "SELECT stream_ordering, topological_ordering, event_id"
+ " FROM events"
+ " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT ?"
+ )
+ else:
+ from_token = _StreamToken.parse_stream_token(from_token)
+ sql = (
+ "SELECT stream_ordering, topological_ordering, event_id"
+ " FROM events"
+ " WHERE room_id = ? AND stream_ordering > ?"
+ " AND stream_ordering <= ? AND outlier = 0"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ def get_recent_events_for_room_txn(txn):
+ if from_token is None:
+ txn.execute(sql, (room_id, end_token.stream, limit,))
+ else:
+ txn.execute(sql, (
+ room_id, from_token.stream, end_token.stream, limit
+ ))
rows = self.cursor_to_dict(txn)
rows.reverse() # As we selected with reverse ordering
if rows:
+ # Tokens are positions between events.
+ # This token points *after* the last event in the chunk.
+ # We need it to point to the event before it in the chunk
+ # since we are going backwards so we subtract one from the
+ # stream part.
topo = rows[0]["topological_ordering"]
- toke = rows[0]["stream_ordering"]
- start_token = "t%s-%s" % (topo, toke)
+ toke = rows[0]["stream_ordering"] - 1
+ start_token = str(_StreamToken(topo, toke))
- token = (start_token, end_token)
+ token = (start_token, str(end_token))
else:
- token = (end_token, end_token)
+ token = (str(end_token), str(end_token))
events = self._get_events_txn(
txn,
@@ -298,9 +326,13 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
+ self._set_before_and_after(events, rows)
+
return events, token
- return self.runInteraction("get_recent_events_for_room", f)
+ return self.runInteraction(
+ "get_recent_events_for_room", get_recent_events_for_room_txn
+ )
def get_room_events_max_id(self):
return self.runInteraction(
@@ -322,3 +354,12 @@ class StreamStore(SQLBaseStore):
key = res[0]["m"]
return "s%d" % (key,)
+
+ @staticmethod
+ def _set_before_and_after(events, rows):
+ for event, row in zip(events, rows):
+ stream = row["stream_ordering"]
+ topo = event.depth
+ internal = event.internal_metadata
+ internal.before = str(_StreamToken(topo, stream - 1))
+ internal.after = str(_StreamToken(topo, stream))
|