summary refs log tree commit diff
path: root/synapse/storage/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r--synapse/storage/__init__.py296
1 files changed, 219 insertions, 77 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4beb951b9f..a63c59a8a2 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -29,10 +29,14 @@ from .stream import StreamStore
 from .transactions import TransactionStore
 from .keys import KeyStore
 from .event_federation import EventFederationStore
+from .pusher import PusherStore
+from .push_rule import PushRuleStore
 from .media_repository import MediaRepositoryStore
+from .rejections import RejectionsStore
 
 from .state import StateStore
 from .signatures import SignatureStore
+from .filtering import FilteringStore
 
 from syutil.base64util import decode_base64
 from syutil.jsonutil import encode_canonical_json
@@ -60,13 +64,16 @@ SCHEMAS = [
     "state",
     "event_edges",
     "event_signatures",
+    "pusher",
     "media_repository",
+    "filtering",
+    "rejections",
 ]
 
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 11
+SCHEMA_VERSION = 12
 
 
 class _RollbackButIsFineException(Exception):
@@ -82,6 +89,10 @@ class DataStore(RoomMemberStore, RoomStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
                 EventFederationStore,
                 MediaRepositoryStore,
+                RejectionsStore,
+                FilteringStore,
+                PusherStore,
+                PushRuleStore
                 ):
 
     def __init__(self, hs):
@@ -117,21 +128,144 @@ class DataStore(RoomMemberStore, RoomStore,
             pass
 
     @defer.inlineCallbacks
-    def get_event(self, event_id, allow_none=False):
-        events = yield self._get_events([event_id])
+    def get_event(self, event_id, check_redacted=True,
+                  get_prev_content=False, allow_rejected=False,
+                  allow_none=False):
+        """Get an event from the database by event_id.
+
+        Args:
+            event_id (str): The event_id of the event to fetch
+            check_redacted (bool): If True, check if event has been redacted
+                and redact it.
+            get_prev_content (bool): If True and event is a state event,
+                include the previous states content in the unsigned field.
+            allow_rejected (bool): If True return rejected events.
+            allow_none (bool): If True, return None if no event found, if
+                False throw an exception.
+
+        Returns:
+            Deferred : A FrozenEvent.
+        """
+        event = yield self.runInteraction(
+            "get_event", self._get_event_txn,
+            event_id,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            allow_rejected=allow_rejected,
+        )
 
-        if not events:
-            if allow_none:
-                defer.returnValue(None)
-            else:
-                raise RuntimeError("Could not find event %s" % (event_id,))
+        if not event and not allow_none:
+            raise RuntimeError("Could not find event %s" % (event_id,))
 
-        defer.returnValue(events[0])
+        defer.returnValue(event)
 
     @log_function
     def _persist_event_txn(self, txn, event, context, backfilled,
                            stream_ordering=None, is_new_state=True,
                            current_state=None):
+
+        # We purposefully do this first since if we include a `current_state`
+        # key, we *want* to update the `current_state_events` table
+        if current_state:
+            txn.execute(
+                "DELETE FROM current_state_events WHERE room_id = ?",
+                (event.room_id,)
+            )
+
+            for s in current_state:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": s.event_id,
+                        "room_id": s.room_id,
+                        "type": s.type,
+                        "state_key": s.state_key,
+                    },
+                    or_replace=True,
+                )
+
+        if event.is_state() and is_new_state:
+            if not backfilled and not context.rejected:
+                self._simple_insert_txn(
+                    txn,
+                    table="state_forward_extremities",
+                    values={
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    or_replace=True,
+                )
+
+                for prev_state_id, _ in event.prev_state:
+                    self._simple_delete_txn(
+                        txn,
+                        table="state_forward_extremities",
+                        keyvalues={
+                            "event_id": prev_state_id,
+                        }
+                    )
+
+        outlier = event.internal_metadata.is_outlier()
+
+        if not outlier:
+            self._store_state_groups_txn(txn, event, context)
+
+            self._update_min_depth_for_room_txn(
+                txn,
+                event.room_id,
+                event.depth
+            )
+
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
+        have_persisted = self._simple_select_one_onecol_txn(
+            txn,
+            table="event_json",
+            keyvalues={"event_id": event.event_id},
+            retcol="event_id",
+            allow_none=True,
+        )
+
+        metadata_json = encode_canonical_json(
+            event.internal_metadata.get_dict()
+        )
+
+        # If we have already persisted this event, we don't need to do any
+        # more processing.
+        # The processing above must be done on every call to persist event,
+        # since they might not have happened on previous calls. For example,
+        # if we are persisting an event that we had persisted as an outlier,
+        # but is no longer one.
+        if have_persisted:
+            if not outlier:
+                sql = (
+                    "UPDATE event_json SET internal_metadata = ?"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (metadata_json.decode("UTF-8"), event.event_id,)
+                )
+
+                sql = (
+                    "UPDATE events SET outlier = 0"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (event.event_id,)
+                )
+            return
+
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
         elif event.type == EventTypes.Feedback:
@@ -143,8 +277,6 @@ class DataStore(RoomMemberStore, RoomStore,
         elif event.type == EventTypes.Redaction:
             self._store_redaction(txn, event)
 
-        outlier = event.internal_metadata.is_outlier()
-
         event_dict = {
             k: v
             for k, v in event.get_dict().items()
@@ -154,10 +286,6 @@ class DataStore(RoomMemberStore, RoomStore,
             ]
         }
 
-        metadata_json = encode_canonical_json(
-            event.internal_metadata.get_dict()
-        )
-
         self._simple_insert_txn(
             txn,
             table="event_json",
@@ -213,38 +341,10 @@ class DataStore(RoomMemberStore, RoomStore,
             )
             raise _RollbackButIsFineException("_persist_event")
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
-        if not outlier:
-            self._store_state_groups_txn(txn, event, context)
-
-        if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
-            )
-
-            for s in current_state:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": s.event_id,
-                        "room_id": s.room_id,
-                        "type": s.type,
-                        "state_key": s.state_key,
-                    },
-                    or_replace=True,
-                )
+        if context.rejected:
+            self._store_rejections_txn(txn, event.event_id, context.rejected)
 
-        is_state = hasattr(event, "state_key") and event.state_key is not None
-        if is_state:
+        if event.is_state():
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
@@ -252,6 +352,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 "state_key": event.state_key,
             }
 
+            # TODO: How does this work with backfilling?
             if hasattr(event, "replaces_state"):
                 vals["prev_state"] = event.replaces_state
 
@@ -262,7 +363,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 or_replace=True,
             )
 
-            if is_new_state:
+            if is_new_state and not context.rejected:
                 self._simple_insert_txn(
                     txn,
                     "current_state_events",
@@ -288,28 +389,6 @@ class DataStore(RoomMemberStore, RoomStore,
                     or_ignore=True,
                 )
 
-            if not backfilled:
-                self._simple_insert_txn(
-                    txn,
-                    table="state_forward_extremities",
-                    values={
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                    or_replace=True,
-                )
-
-                for prev_state_id, _ in event.prev_state:
-                    self._simple_delete_txn(
-                        txn,
-                        table="state_forward_extremities",
-                        keyvalues={
-                            "event_id": prev_state_id,
-                        }
-                    )
-
         for hash_alg, hash_base64 in event.hashes.items():
             hash_bytes = decode_base64(hash_base64)
             self._store_event_content_hash_txn(
@@ -340,13 +419,6 @@ class DataStore(RoomMemberStore, RoomStore,
             txn, event.event_id, ref_alg, ref_hash_bytes
         )
 
-        if not outlier:
-            self._update_min_depth_for_room_txn(
-                txn,
-                event.room_id,
-                event.depth
-            )
-
     def _store_redaction(self, txn, event):
         txn.execute(
             "INSERT OR IGNORE INTO redactions "
@@ -370,9 +442,12 @@ class DataStore(RoomMemberStore, RoomStore,
             "redacted": del_sql,
         }
 
-        if event_type:
+        if event_type and state_key is not None:
             sql += " AND s.type = ? AND s.state_key = ? "
             args = (room_id, event_type, state_key)
+        elif event_type:
+            sql += " AND s.type = ?"
+            args = (room_id, event_type)
         else:
             args = (room_id, )
 
@@ -382,6 +457,41 @@ class DataStore(RoomMemberStore, RoomStore,
         defer.returnValue(events)
 
     @defer.inlineCallbacks
+    def get_room_name_and_aliases(self, room_id):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
+        sql = (
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        ) % {
+            "redacted": del_sql,
+        }
+
+        sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+        sql += " OR s.type = 'm.room.aliases')"
+        args = (room_id,)
+
+        results = yield self._execute_and_decode(sql, *args)
+
+        events = yield self._parse_events(results)
+
+        name = None
+        aliases = []
+
+        for e in events:
+            if e.type == 'm.room.name':
+                name = e.content['name']
+            elif e.type == 'm.room.aliases':
+                aliases.extend(e.content['aliases'])
+
+        defer.returnValue((name, aliases))
+
+    @defer.inlineCallbacks
     def _get_min_token(self):
         row = yield self._execute(
             None,
@@ -417,6 +527,38 @@ class DataStore(RoomMemberStore, RoomStore,
             ],
         )
 
+    def have_events(self, event_ids):
+        """Given a list of event ids, check if we have already processed them.
+
+        Returns:
+            dict: Has an entry for each event id we already have seen. Maps to
+            the rejected reason string if we rejected the event, else maps to
+            None.
+        """
+        if not event_ids:
+            return defer.succeed({})
+
+        def f(txn):
+            sql = (
+                "SELECT e.event_id, reason FROM events as e "
+                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+                "WHERE e.event_id = ?"
+            )
+
+            res = {}
+            for event_id in event_ids:
+                txn.execute(sql, (event_id,))
+                row = txn.fetchone()
+                if row:
+                    _, rejected = row
+                    res[event_id] = rejected
+
+            return res
+
+        return self.runInteraction(
+            "have_events", f,
+        )
+
 
 def schema_path(schema):
     """ Get a filesystem path for the named database schema