diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4beb951b9f..02b1f06854 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -29,10 +29,14 @@ from .stream import StreamStore
from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
+from .pusher import PusherStore
+from .push_rule import PushRuleStore
from .media_repository import MediaRepositoryStore
+from .rejections import RejectionsStore
from .state import StateStore
from .signatures import SignatureStore
+from .filtering import FilteringStore
from syutil.base64util import decode_base64
from syutil.jsonutil import encode_canonical_json
@@ -40,7 +44,6 @@ from syutil.jsonutil import encode_canonical_json
from synapse.crypto.event_signing import compute_event_reference_hash
-import json
import logging
import os
@@ -60,13 +63,16 @@ SCHEMAS = [
"state",
"event_edges",
"event_signatures",
+ "pusher",
"media_repository",
+ "filtering",
+ "rejections",
]
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 11
+SCHEMA_VERSION = 12
class _RollbackButIsFineException(Exception):
@@ -82,6 +88,10 @@ class DataStore(RoomMemberStore, RoomStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
EventFederationStore,
MediaRepositoryStore,
+ RejectionsStore,
+ FilteringStore,
+ PusherStore,
+ PushRuleStore
):
def __init__(self, hs):
@@ -117,21 +127,147 @@ class DataStore(RoomMemberStore, RoomStore,
pass
@defer.inlineCallbacks
- def get_event(self, event_id, allow_none=False):
- events = yield self._get_events([event_id])
+ def get_event(self, event_id, check_redacted=True,
+ get_prev_content=False, allow_rejected=False,
+ allow_none=False):
+ """Get an event from the database by event_id.
+
+ Args:
+ event_id (str): The event_id of the event to fetch
+ check_redacted (bool): If True, check if event has been redacted
+ and redact it.
+ get_prev_content (bool): If True and event is a state event,
+ include the previous states content in the unsigned field.
+ allow_rejected (bool): If True return rejected events.
+ allow_none (bool): If True, return None if no event found, if
+ False throw an exception.
+
+ Returns:
+ Deferred : A FrozenEvent.
+ """
+ event = yield self.runInteraction(
+ "get_event", self._get_event_txn,
+ event_id,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
- if not events:
- if allow_none:
- defer.returnValue(None)
- else:
- raise RuntimeError("Could not find event %s" % (event_id,))
+ if not event and not allow_none:
+ raise RuntimeError("Could not find event %s" % (event_id,))
- defer.returnValue(events[0])
+ defer.returnValue(event)
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
+
+ # Remove the any existing cache entries for the event_id
+ self._get_event_cache.pop(event.event_id)
+
+ # We purposefully do this first since if we include a `current_state`
+ # key, we *want* to update the `current_state_events` table
+ if current_state:
+ txn.execute(
+ "DELETE FROM current_state_events WHERE room_id = ?",
+ (event.room_id,)
+ )
+
+ for s in current_state:
+ self._simple_insert_txn(
+ txn,
+ "current_state_events",
+ {
+ "event_id": s.event_id,
+ "room_id": s.room_id,
+ "type": s.type,
+ "state_key": s.state_key,
+ },
+ or_replace=True,
+ )
+
+ if event.is_state() and is_new_state:
+ if not backfilled and not context.rejected:
+ self._simple_insert_txn(
+ txn,
+ table="state_forward_extremities",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ or_replace=True,
+ )
+
+ for prev_state_id, _ in event.prev_state:
+ self._simple_delete_txn(
+ txn,
+ table="state_forward_extremities",
+ keyvalues={
+ "event_id": prev_state_id,
+ }
+ )
+
+ outlier = event.internal_metadata.is_outlier()
+
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
+
+ self._update_min_depth_for_room_txn(
+ txn,
+ event.room_id,
+ event.depth
+ )
+
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
+ have_persisted = self._simple_select_one_onecol_txn(
+ txn,
+ table="event_json",
+ keyvalues={"event_id": event.event_id},
+ retcol="event_id",
+ allow_none=True,
+ )
+
+ metadata_json = encode_canonical_json(
+ event.internal_metadata.get_dict()
+ )
+
+ # If we have already persisted this event, we don't need to do any
+ # more processing.
+ # The processing above must be done on every call to persist event,
+ # since they might not have happened on previous calls. For example,
+ # if we are persisting an event that we had persisted as an outlier,
+ # but is no longer one.
+ if have_persisted:
+ if not outlier:
+ sql = (
+ "UPDATE event_json SET internal_metadata = ?"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (metadata_json.decode("UTF-8"), event.event_id,)
+ )
+
+ sql = (
+ "UPDATE events SET outlier = 0"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (event.event_id,)
+ )
+ return
+
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Feedback:
@@ -143,8 +279,6 @@ class DataStore(RoomMemberStore, RoomStore,
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
- outlier = event.internal_metadata.is_outlier()
-
event_dict = {
k: v
for k, v in event.get_dict().items()
@@ -154,10 +288,6 @@ class DataStore(RoomMemberStore, RoomStore,
]
}
- metadata_json = encode_canonical_json(
- event.internal_metadata.get_dict()
- )
-
self._simple_insert_txn(
txn,
table="event_json",
@@ -170,12 +300,16 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
+ content = encode_canonical_json(
+ event.content
+ ).decode("UTF-8")
+
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
- "content": json.dumps(event.get_dict()["content"]),
+ "content": content,
"processed": True,
"outlier": outlier,
"depth": event.depth,
@@ -195,7 +329,10 @@ class DataStore(RoomMemberStore, RoomStore,
"prev_events",
]
}
- vals["unrecognized_keys"] = json.dumps(unrec)
+
+ vals["unrecognized_keys"] = encode_canonical_json(
+ unrec
+ ).decode("UTF-8")
try:
self._simple_insert_txn(
@@ -213,38 +350,10 @@ class DataStore(RoomMemberStore, RoomStore,
)
raise _RollbackButIsFineException("_persist_event")
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
- )
-
- if not outlier:
- self._store_state_groups_txn(txn, event, context)
-
- if current_state:
- txn.execute(
- "DELETE FROM current_state_events WHERE room_id = ?",
- (event.room_id,)
- )
+ if context.rejected:
+ self._store_rejections_txn(txn, event.event_id, context.rejected)
- for s in current_state:
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": s.event_id,
- "room_id": s.room_id,
- "type": s.type,
- "state_key": s.state_key,
- },
- or_replace=True,
- )
-
- is_state = hasattr(event, "state_key") and event.state_key is not None
- if is_state:
+ if event.is_state():
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -252,6 +361,7 @@ class DataStore(RoomMemberStore, RoomStore,
"state_key": event.state_key,
}
+ # TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
@@ -262,7 +372,7 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
- if is_new_state:
+ if is_new_state and not context.rejected:
self._simple_insert_txn(
txn,
"current_state_events",
@@ -288,28 +398,6 @@ class DataStore(RoomMemberStore, RoomStore,
or_ignore=True,
)
- if not backfilled:
- self._simple_insert_txn(
- txn,
- table="state_forward_extremities",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- or_replace=True,
- )
-
- for prev_state_id, _ in event.prev_state:
- self._simple_delete_txn(
- txn,
- table="state_forward_extremities",
- keyvalues={
- "event_id": prev_state_id,
- }
- )
-
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
@@ -340,14 +428,9 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, ref_alg, ref_hash_bytes
)
- if not outlier:
- self._update_min_depth_for_room_txn(
- txn,
- event.room_id,
- event.depth
- )
-
def _store_redaction(self, txn, event):
+ # invalidate the cache for the redacted event
+ self._get_event_cache.pop(event.redacts)
txn.execute(
"INSERT OR IGNORE INTO redactions "
"(event_id, redacts) VALUES (?,?)",
@@ -370,9 +453,12 @@ class DataStore(RoomMemberStore, RoomStore,
"redacted": del_sql,
}
- if event_type:
+ if event_type and state_key is not None:
sql += " AND s.type = ? AND s.state_key = ? "
args = (room_id, event_type, state_key)
+ elif event_type:
+ sql += " AND s.type = ?"
+ args = (room_id, event_type)
else:
args = (room_id, )
@@ -382,6 +468,41 @@ class DataStore(RoomMemberStore, RoomStore,
defer.returnValue(events)
@defer.inlineCallbacks
+ def get_room_name_and_aliases(self, room_id):
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+ "LIMIT 1"
+ )
+
+ sql = (
+ "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+ "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+ "INNER JOIN state_events as s ON e.event_id = s.event_id "
+ "WHERE c.room_id = ? "
+ ) % {
+ "redacted": del_sql,
+ }
+
+ sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+ sql += " OR s.type = 'm.room.aliases')"
+ args = (room_id,)
+
+ results = yield self._execute_and_decode(sql, *args)
+
+ events = yield self._parse_events(results)
+
+ name = None
+ aliases = []
+
+ for e in events:
+ if e.type == 'm.room.name':
+ name = e.content['name']
+ elif e.type == 'm.room.aliases':
+ aliases.extend(e.content['aliases'])
+
+ defer.returnValue((name, aliases))
+
+ @defer.inlineCallbacks
def _get_min_token(self):
row = yield self._execute(
None,
@@ -417,6 +538,38 @@ class DataStore(RoomMemberStore, RoomStore,
],
)
+ def have_events(self, event_ids):
+ """Given a list of event ids, check if we have already processed them.
+
+ Returns:
+ dict: Has an entry for each event id we already have seen. Maps to
+ the rejected reason string if we rejected the event, else maps to
+ None.
+ """
+ if not event_ids:
+ return defer.succeed({})
+
+ def f(txn):
+ sql = (
+ "SELECT e.event_id, reason FROM events as e "
+ "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+ "WHERE e.event_id = ?"
+ )
+
+ res = {}
+ for event_id in event_ids:
+ txn.execute(sql, (event_id,))
+ row = txn.fetchone()
+ if row:
+ _, rejected = row
+ res[event_id] = rejected
+
+ return res
+
+ return self.runInteraction(
+ "have_events", f,
+ )
+
def schema_path(schema):
""" Get a filesystem path for the named database schema
|