summary refs log tree commit diff
path: root/synapse/storage/stats.py
blob: f6fc56cbe574ea809ceca707d7bc53560cbe69b6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# -*- coding: utf-8 -*-
# Copyright 2018 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from .StateDeltasStore import StateDeltasStore

logger = logging.getLogger(__name__)

# these fields track relative numbers (e.g. number of events sent in this timeslice)
RELATIVE_STATS_FIELDS = {
    "room": (
        "sent_events"
    ),
    "user": (
        "sent_events"
    )
}

# these fields track rather than absolutes (e.g. total number of rooms on the server)
ABSOLUTE_STATS_FIELDS = {
    "room": (
        "current_state_events",
        "joined_members",
        "invited_members",
        "left_members",
        "banned_members",
        "state_events",
        "local_events",
        "remote_events",
    ),
    "user": (
        "local_events",
        "public_rooms",
        "private_rooms",
        "sent_file_count",
        "sent_file_size",
    ),
}


class StatsStore(StateDeltasStore):

    def get_stats_stream_pos(self):
        return self._simple_select_one_onecol(
            table="stats_stream_pos",
            keyvalues={},
            retcol="stream_id",
            desc="stats_stream_pos",
        )

    def update_stats_stream_pos(self, stream_id):
        return self._simple_update_one(
            table="stats_stream_pos",
            keyvalues={},
            updatevalues={"stream_id": stream_id},
            desc="update_stats_stream_pos",
        )

    def update_room_state(self, room_id, fields):
        return self._simple_upsert(
            table="room_state",
            keyvalues={
                "room_id": room_id,
            },
            values=fields,
            desc="update_room_state",
        )

    def update_stats(self, stats_type, stats_id, ts, fields):
        return self._simple_upsert(
            table=("%s_stats" % stats_type),
            keyvalues={
                ("%s_id" % stats_type): stats_id,
                "ts": ts,
            },
            updatevalues=fields,
            desc="update_stats",
        )

    def update_stats_delta(self, ts, bucket_size, stats_type, stats_id, field, value):
        def _update_stats_delta(txn):
            table = "%s_stats" % stats_type
            id_col = "%s_id" % stats_type

            sql = (
                "SELECT * FROM %s"
                " WHERE %s=? and ts=("
                "  SELECT MAX(ts) FROM %s"
                "  WHERE where %s=?"
                ")"
            ) % (table, id_col, table, id_col)
            txn.execute(sql, (stats_id, stats_id))
            rows = self.cursor_to_dict(txn)
            if len(rows) == 0:
                # silently skip as we don't have anything to apply a delta to yet.
                # this tries to minimise any race between the initial sync and
                # subsequent deltas arriving.
                return

            values = {
                key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
            }
            values[id_col] = stats_id
            values["ts"] = ts
            values["bucket_size"] = bucket_size

            latest_ts = rows[0]["ts"]
            if ts != latest_ts:
                # we have to copy our absolute counters over to the new entry.
                self._simple_insert_txn(
                    txn,
                    table=table,
                    values=values
                )

            # actually update the new value
            if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
                self._simple_update_txn(
                    txn,
                    table=table,
                    keyvalues={
                        id_col: stats_id,
                        "ts": ts,
                    },
                    updatevalues={
                        field: value
                    }
                )
            else:
                sql = (
                    "UPDATE %s "
                    " SET %s=%s+?"
                    " WHERE %s=? AND ts=?"
                ) % (table, field, field, id_col)
                txn.execute(sql, (value, stats_id, ts))

        return self.runInteraction(
            "update_stats_delta", _update_stats_delta
        )