summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py71
-rw-r--r--synapse/storage/client_ips.py7
-rw-r--r--synapse/storage/event_push_actions.py53
-rw-r--r--synapse/storage/events.py77
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/receipts.py19
-rw-r--r--synapse/storage/registration.py18
-rw-r--r--synapse/storage/schema/delta/48/add_user_consent.sql18
-rw-r--r--synapse/storage/schema/delta/49/add_user_daily_visits.sql21
-rw-r--r--synapse/storage/schema/delta/49/add_user_ips_last_seen_only_index.sql17
10 files changed, 229 insertions, 74 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 8cdfd50f90..4551cf8774 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -14,6 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import datetime
+from dateutil import tz
+import time
+import logging
+
 from synapse.storage.devices import DeviceStore
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
@@ -55,10 +60,6 @@ from .engines import PostgresEngine
 from synapse.api.constants import PresenceState
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-
-import logging
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -213,6 +214,9 @@ class DataStore(RoomMemberStore, RoomStore,
         self._stream_order_on_start = self.get_room_max_stream_ordering()
         self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
+        # Used in _generate_user_daily_visits to keep track of progress
+        self._last_user_visit_update = self._get_start_of_day()
+
         super(DataStore, self).__init__(db_conn, hs)
 
     def take_presence_startup_info(self):
@@ -347,6 +351,65 @@ class DataStore(RoomMemberStore, RoomStore,
 
         return self.runInteraction("count_r30_users", _count_r30_users)
 
+    def _get_start_of_day(self):
+        """
+        Returns millisecond unixtime for start of UTC day.
+        """
+        now = datetime.datetime.utcnow()
+        today_start = datetime.datetime(now.year, now.month,
+                                        now.day, tzinfo=tz.tzutc())
+        return int(time.mktime(today_start.timetuple())) * 1000
+
+    def generate_user_daily_visits(self):
+        """
+        Generates daily visit data for use in cohort/ retention analysis
+        """
+        def _generate_user_daily_visits(txn):
+            logger.info("Calling _generate_user_daily_visits")
+            today_start = self._get_start_of_day()
+            a_day_in_milliseconds = 24 * 60 * 60 * 1000
+            now = self.clock.time_msec()
+
+            sql = """
+                INSERT INTO user_daily_visits (user_id, device_id, timestamp)
+                    SELECT u.user_id, u.device_id, ?
+                    FROM user_ips AS u
+                    LEFT JOIN (
+                      SELECT user_id, device_id, timestamp FROM user_daily_visits
+                      WHERE timestamp IS ?
+                    ) udv
+                    ON u.user_id = udv.user_id AND u.device_id=udv.device_id
+                    WHERE last_seen > ? AND last_seen <= ? AND udv.timestamp IS NULL
+            """
+
+            # This means that the day has rolled over but there could still
+            # be entries from the previous day. There is an edge case
+            # where if the user logs in at 23:59 and overwrites their
+            # last_seen at 00:01 then they will not be counted in the
+            # previous day's stats - it is important that the query is run
+            # often to minimise this case.
+            if today_start > self._last_user_visit_update:
+                yesterday_start = today_start - a_day_in_milliseconds
+                txn.execute(sql, (
+                    yesterday_start, yesterday_start,
+                    self._last_user_visit_update, today_start
+                ))
+                self._last_user_visit_update = today_start
+
+            txn.execute(sql, (
+                today_start, today_start,
+                self._last_user_visit_update,
+                now
+            ))
+            # Update _last_user_visit_update to now. The reason to do this
+            # rather just clamping to the beginning of the day is to limit
+            # the size of the join - meaning that the query can be run more
+            # frequently
+            self._last_user_visit_update = now
+
+        return self.runInteraction("generate_user_daily_visits",
+                                   _generate_user_daily_visits)
+
     def get_users(self):
         """Function to reterive a list of users in users table.
 
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 7b44dae0fc..ba46907737 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -55,6 +55,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
             columns=["user_id", "last_seen"],
         )
 
+        self.register_background_index_update(
+            "user_ips_last_seen_only_index",
+            index_name="user_ips_last_seen_only",
+            table="user_ips",
+            columns=["last_seen"],
+        )
+
         # (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
         self._batch_row_update = {}
 
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index c22762eb5c..f084a5f54b 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -18,8 +18,6 @@ from synapse.storage._base import SQLBaseStore, LoggingTransaction
 from twisted.internet import defer
 from synapse.util.async import sleep
 from synapse.util.caches.descriptors import cachedInlineCallbacks
-from synapse.types import RoomStreamToken
-from .stream import lower_bound
 
 import logging
 import simplejson as json
@@ -99,7 +97,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
     def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
                                           last_read_event_id):
         sql = (
-            "SELECT stream_ordering, topological_ordering"
+            "SELECT stream_ordering"
             " FROM events"
             " WHERE room_id = ? AND event_id = ?"
         )
@@ -111,17 +109,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             return {"notify_count": 0, "highlight_count": 0}
 
         stream_ordering = results[0][0]
-        topological_ordering = results[0][1]
 
         return self._get_unread_counts_by_pos_txn(
-            txn, room_id, user_id, topological_ordering, stream_ordering
+            txn, room_id, user_id, stream_ordering
         )
 
-    def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
-                                      stream_ordering):
-        token = RoomStreamToken(
-            topological_ordering, stream_ordering
-        )
+    def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
 
         # First get number of notifications.
         # We don't need to put a notif=1 clause as all rows always have
@@ -132,10 +125,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             " WHERE"
             " user_id = ?"
             " AND room_id = ?"
-            " AND %s"
-        ) % (lower_bound(token, self.database_engine, inclusive=False),)
+            " AND stream_ordering > ?"
+        )
 
-        txn.execute(sql, (user_id, room_id))
+        txn.execute(sql, (user_id, room_id, stream_ordering))
         row = txn.fetchone()
         notify_count = row[0] if row else 0
 
@@ -155,10 +148,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             " highlight = 1"
             " AND user_id = ?"
             " AND room_id = ?"
-            " AND %s"
-        ) % (lower_bound(token, self.database_engine, inclusive=False),)
+            " AND stream_ordering > ?"
+        )
 
-        txn.execute(sql, (user_id, room_id))
+        txn.execute(sql, (user_id, room_id, stream_ordering))
         row = txn.fetchone()
         highlight_count = row[0] if row else 0
 
@@ -209,7 +202,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "   ep.highlight "
                 " FROM ("
                 "   SELECT room_id,"
-                "       MAX(topological_ordering) as topological_ordering,"
                 "       MAX(stream_ordering) as stream_ordering"
                 "   FROM events"
                 "   INNER JOIN receipts_linearized USING (room_id, event_id)"
@@ -219,13 +211,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 " event_push_actions AS ep"
                 " WHERE"
                 "   ep.room_id = rl.room_id"
-                "   AND ("
-                "       ep.topological_ordering > rl.topological_ordering"
-                "       OR ("
-                "           ep.topological_ordering = rl.topological_ordering"
-                "           AND ep.stream_ordering > rl.stream_ordering"
-                "       )"
-                "   )"
+                "   AND ep.stream_ordering > rl.stream_ordering"
                 "   AND ep.user_id = ?"
                 "   AND ep.stream_ordering > ?"
                 "   AND ep.stream_ordering <= ?"
@@ -318,7 +304,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "  ep.highlight, e.received_ts"
                 " FROM ("
                 "   SELECT room_id,"
-                "       MAX(topological_ordering) as topological_ordering,"
                 "       MAX(stream_ordering) as stream_ordering"
                 "   FROM events"
                 "   INNER JOIN receipts_linearized USING (room_id, event_id)"
@@ -329,13 +314,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 " INNER JOIN events AS e USING (room_id, event_id)"
                 " WHERE"
                 "   ep.room_id = rl.room_id"
-                "   AND ("
-                "       ep.topological_ordering > rl.topological_ordering"
-                "       OR ("
-                "           ep.topological_ordering = rl.topological_ordering"
-                "           AND ep.stream_ordering > rl.stream_ordering"
-                "       )"
-                "   )"
+                "   AND ep.stream_ordering > rl.stream_ordering"
                 "   AND ep.user_id = ?"
                 "   AND ep.stream_ordering > ?"
                 "   AND ep.stream_ordering <= ?"
@@ -762,10 +741,10 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         )
 
     def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
-                                            topological_ordering, stream_ordering):
+                                            stream_ordering):
         """
         Purges old push actions for a user and room before a given
-        topological_ordering.
+        stream_ordering.
 
         We however keep a months worth of highlighted notifications, so that
         users can still get a list of recent highlights.
@@ -774,7 +753,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             txn: The transcation
             room_id: Room ID to delete from
             user_id: user ID to delete for
-            topological_ordering: The lowest topological ordering which will
+            stream_ordering: The lowest stream ordering which will
                                   not be deleted.
         """
         txn.call_after(
@@ -793,9 +772,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         txn.execute(
             "DELETE FROM event_push_actions "
             " WHERE user_id = ? AND room_id = ? AND "
-            " topological_ordering <= ?"
+            " stream_ordering <= ?"
             " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
-            (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+            (user_id, room_id, stream_ordering, self.stream_ordering_month_ago)
         )
 
         txn.execute("""
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 05cde96afc..5ebef98c4f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -33,7 +33,7 @@ from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id
+from synapse.types import get_domain_from_id, RoomStreamToken
 import synapse.metrics
 
 # these are only included to make the type annotations work
@@ -1803,15 +1803,14 @@ class EventsStore(EventsWorkerStore):
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
     def purge_history(
-        self, room_id, topological_ordering, delete_local_events,
+        self, room_id, token, delete_local_events,
     ):
         """Deletes room history before a certain point
 
         Args:
             room_id (str):
 
-            topological_ordering (int):
-                minimum topo ordering to preserve
+            token (str): A topological token to delete events before
 
             delete_local_events (bool):
                 if True, we will delete local events as well as remote ones
@@ -1821,13 +1820,15 @@ class EventsStore(EventsWorkerStore):
 
         return self.runInteraction(
             "purge_history",
-            self._purge_history_txn, room_id, topological_ordering,
+            self._purge_history_txn, room_id, token,
             delete_local_events,
         )
 
     def _purge_history_txn(
-        self, txn, room_id, topological_ordering, delete_local_events,
+        self, txn, room_id, token_str, delete_local_events,
     ):
+        token = RoomStreamToken.parse(token_str)
+
         # Tables that should be pruned:
         #     event_auth
         #     event_backward_extremities
@@ -1872,6 +1873,13 @@ class EventsStore(EventsWorkerStore):
             " ON events_to_purge(should_delete)",
         )
 
+        # We do joins against events_to_purge for e.g. calculating state
+        # groups to purge, etc., so lets make an index.
+        txn.execute(
+            "CREATE INDEX events_to_purge_id"
+            " ON events_to_purge(event_id)",
+        )
+
         # First ensure that we're not about to delete all the forward extremeties
         txn.execute(
             "SELECT e.event_id, e.depth FROM events as e "
@@ -1884,7 +1892,7 @@ class EventsStore(EventsWorkerStore):
         rows = txn.fetchall()
         max_depth = max(row[0] for row in rows)
 
-        if max_depth <= topological_ordering:
+        if max_depth <= token.topological:
             # We need to ensure we don't delete all the events from the datanase
             # otherwise we wouldn't be able to send any events (due to not
             # having any backwards extremeties)
@@ -1900,7 +1908,7 @@ class EventsStore(EventsWorkerStore):
             should_delete_expr += " AND event_id NOT LIKE ?"
             should_delete_params += ("%:" + self.hs.hostname, )
 
-        should_delete_params += (room_id, topological_ordering)
+        should_delete_params += (room_id, token.topological)
 
         txn.execute(
             "INSERT INTO events_to_purge"
@@ -1923,13 +1931,13 @@ class EventsStore(EventsWorkerStore):
         logger.info("[purge] Finding new backward extremities")
 
         # We calculate the new entries for the backward extremeties by finding
-        # all events that point to events that are to be purged
+        # events to be purged that are pointed to by events we're not going to
+        # purge.
         txn.execute(
             "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
             " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
-            " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
-            " WHERE e2.topological_ordering >= ?",
-            (topological_ordering, )
+            " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
+            " WHERE ep2.event_id IS NULL",
         )
         new_backwards_extrems = txn.fetchall()
 
@@ -1953,16 +1961,22 @@ class EventsStore(EventsWorkerStore):
 
         # Get all state groups that are only referenced by events that are
         # to be deleted.
-        txn.execute(
-            "SELECT state_group FROM event_to_state_groups"
-            " INNER JOIN events USING (event_id)"
-            " WHERE state_group IN ("
-            "   SELECT DISTINCT state_group FROM events_to_purge"
-            "   INNER JOIN event_to_state_groups USING (event_id)"
-            " )"
-            " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
-            (topological_ordering, )
-        )
+        # This works by first getting state groups that we may want to delete,
+        # joining against event_to_state_groups to get events that use that
+        # state group, then left joining against events_to_purge again. Any
+        # state group where the left join produce *no nulls* are referenced
+        # only by events that are going to be purged.
+        txn.execute("""
+            SELECT state_group FROM
+            (
+                SELECT DISTINCT state_group FROM events_to_purge
+                INNER JOIN event_to_state_groups USING (event_id)
+            ) AS sp
+            INNER JOIN event_to_state_groups USING (state_group)
+            LEFT JOIN events_to_purge AS ep USING (event_id)
+            GROUP BY state_group
+            HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+        """)
 
         state_rows = txn.fetchall()
         logger.info("[purge] found %i redundant state groups", len(state_rows))
@@ -2109,10 +2123,25 @@ class EventsStore(EventsWorkerStore):
         #
         # So, let's stick it at the end so that we don't block event
         # persistence.
-        logger.info("[purge] updating room_depth")
+        #
+        # We do this by calculating the minimum depth of the backwards
+        # extremities. However, the events in event_backward_extremities
+        # are ones we don't have yet so we need to look at the events that
+        # point to it via event_edges table.
+        txn.execute("""
+            SELECT COALESCE(MIN(depth), 0)
+            FROM event_backward_extremities AS eb
+            INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
+            INNER JOIN events AS e ON e.event_id = eg.event_id
+            WHERE eb.room_id = ?
+        """, (room_id,))
+        min_depth, = txn.fetchone()
+
+        logger.info("[purge] updating room_depth to %d", min_depth)
+
         txn.execute(
             "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
-            (topological_ordering, room_id,)
+            (min_depth, room_id,)
         )
 
         # finally, drop the temp table. this will commit the txn in sqlite,
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 04411a665f..c08e9cd65a 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 48
+SCHEMA_VERSION = 49
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 63997ed449..709c69a926 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -297,18 +297,22 @@ class ReceiptsWorkerStore(SQLBaseStore):
         if receipt_type != "m.read":
             return
 
-        # Returns an ObservableDeferred
+        # Returns either an ObservableDeferred or the raw result
         res = self.get_users_with_read_receipts_in_room.cache.get(
             room_id, None, update_metrics=False,
         )
 
-        if res:
-            if isinstance(res, defer.Deferred) and res.called:
+        # first handle the Deferred case
+        if isinstance(res, defer.Deferred):
+            if res.called:
                 res = res.result
-            if user_id in res:
-                # We'd only be adding to the set, so no point invalidating if the
-                # user is already there
-                return
+            else:
+                res = None
+
+        if res and user_id in res:
+            # We'd only be adding to the set, so no point invalidating if the
+            # user is already there
+            return
 
         self.get_users_with_read_receipts_in_room.invalidate((room_id,))
 
@@ -407,7 +411,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
                 txn,
                 room_id=room_id,
                 user_id=user_id,
-                topological_ordering=topological_ordering,
                 stream_ordering=stream_ordering,
             )
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c05ce4612f..8d1a01f1ee 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -286,6 +286,24 @@ class RegistrationStore(RegistrationWorkerStore,
             "user_set_password_hash", user_set_password_hash_txn
         )
 
+    def user_set_consent_version(self, user_id, consent_version):
+        """Updates the user table to record privacy policy consent
+
+        Args:
+            user_id (str): full mxid of the user to update
+            consent_version (str): version of the policy the user has consented
+                to
+
+        Raises:
+            StoreError(404) if user not found
+        """
+        return self._simple_update_one(
+            table='users',
+            keyvalues={'name': user_id, },
+            updatevalues={'consent_version': consent_version, },
+            desc="user_set_consent_version"
+        )
+
     def user_delete_access_tokens(self, user_id, except_token_id=None,
                                   device_id=None):
         """
diff --git a/synapse/storage/schema/delta/48/add_user_consent.sql b/synapse/storage/schema/delta/48/add_user_consent.sql
new file mode 100644
index 0000000000..5237491506
--- /dev/null
+++ b/synapse/storage/schema/delta/48/add_user_consent.sql
@@ -0,0 +1,18 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* record the version of the privacy policy the user has consented to
+ */
+ALTER TABLE users ADD COLUMN consent_version TEXT;
diff --git a/synapse/storage/schema/delta/49/add_user_daily_visits.sql b/synapse/storage/schema/delta/49/add_user_daily_visits.sql
new file mode 100644
index 0000000000..3dd478196f
--- /dev/null
+++ b/synapse/storage/schema/delta/49/add_user_daily_visits.sql
@@ -0,0 +1,21 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL,
+                                 device_id TEXT,
+                                 timestamp BIGINT NOT NULL );
+CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp);
+CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp);
diff --git a/synapse/storage/schema/delta/49/add_user_ips_last_seen_only_index.sql b/synapse/storage/schema/delta/49/add_user_ips_last_seen_only_index.sql
new file mode 100644
index 0000000000..3a4ed59b5b
--- /dev/null
+++ b/synapse/storage/schema/delta/49/add_user_ips_last_seen_only_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('user_ips_last_seen_only_index', '{}');