summary refs log tree commit diff
path: root/synapse/storage/monthly_active_users.py
blob: 19846cefd9f94acb63e609fbd7c15c46172e323e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from twisted.internet import defer
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.storage.engines import PostgresEngine, Sqlite3Engine

from ._base import SQLBaseStore


class MonthlyActiveUsersStore(SQLBaseStore):
    def __init__(self, dbconn, hs):
        super(MonthlyActiveUsersStore, self).__init__(None, hs)
        self._clock = hs.get_clock()
        self.max_mau_value = hs.config.max_mau_value

    def reap_monthly_active_users(self):
        """
        Cleans out monthly active user table to ensure that no stale
        entries exist.
        Return:
            Defered()
        """
        def _reap_users(txn):
            thirty_days_ago = (
                int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
            )

            if isinstance(self.database_engine, PostgresEngine):
                sql = """
                    DELETE FROM monthly_active_users
                    WHERE timestamp < ?
                    RETURNING user_id
                    """
                txn.execute(sql, (thirty_days_ago,))
                res = txn.fetchall()
                for r in res:
                    self.is_user_monthly_active.invalidate(r)

                sql = """
                    DELETE FROM monthly_active_users
                    ORDER BY timestamp desc
                    LIMIT -1 OFFSET ?
                    RETURNING user_id
                    """
                txn.execute(sql, (self.max_mau_value,))
                res = txn.fetchall()
                for r in res:
                    self.is_user_monthly_active.invalidate(r)
                    print r
                self.get_monthly_active_count.invalidate()
            elif isinstance(self.database_engine, Sqlite3Engine):
                sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"

                txn.execute(sql, (thirty_days_ago,))
                sql = """
                    DELETE FROM monthly_active_users
                    ORDER BY timestamp desc
                    LIMIT -1 OFFSET ?
                    """
                txn.execute(sql, (self.max_mau_value,))

                # It seems poor to invalidate the whole cache, but the alternative
                # is to select then delete which has it's own problem.
                # It seems unlikely that anyone using this feature on large datasets
                # would be using sqlite and if they are then there will be
                # larger perf issues than this one to encourage an upgrade to postgres.

                self.is_user_monthly_active.invalidate_all()
                self.get_monthly_active_count.invalidate_all()

        return self.runInteraction("reap_monthly_active_users", _reap_users)

    @cachedInlineCallbacks(num_args=0)
    def get_monthly_active_count(self):
        """
            Generates current count of monthly active users.abs
            Return:
                Defered(int): Number of current monthly active users
        """
        def _count_users(txn):
            sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"

            txn.execute(sql)
            count, = txn.fetchone()
            return count
        return self.runInteraction("count_users", _count_users)

    def upsert_monthly_active_user(self, user_id):
        """
            Updates or inserts monthly active user member
            Arguments:
                user_id (str): user to add/update
            Deferred(bool): True if a new entry was created, False if an
                existing one was updated.
        """
        return self._simple_upsert(
            desc="upsert_monthly_active_user",
            table="monthly_active_users",
            keyvalues={
                "user_id": user_id,
            },
            values={
                "timestamp": int(self._clock.time_msec()),
            },
            lock=False,
        )

    @cachedInlineCallbacks(num_args=1)
    def is_user_monthly_active(self, user_id):
        """
            Checks if a given user is part of the monthly active user group
            Arguments:
                user_id (str): user to add/update
            Return:
                bool : True if user part of group, False otherwise
        """
        user_present = yield self._simple_select_onecol(
            table="monthly_active_users",
            keyvalues={
                "user_id": user_id,
            },
            retcol="user_id",
            desc="is_user_monthly_active",
        )

        defer.returnValue(bool(user_present))