summary refs log tree commit diff
path: root/synapse/storage/_base.py
blob: fd5bb3e1deb1763a6cd59bf6eed3b611bde171b9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017-2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random

from six import PY2
from six.moves import builtins

from canonicaljson import json

from synapse.storage.database import LoggingTransaction  # noqa: F401
from synapse.storage.database import make_in_list_sql_clause  # noqa: F401
from synapse.storage.database import Database
from synapse.types import get_domain_from_id

logger = logging.getLogger(__name__)


class SQLBaseStore(object):
    def __init__(self, db_conn, hs):
        self.hs = hs
        self._clock = hs.get_clock()
        self.database_engine = hs.database_engine
        self.db = Database(hs)
        self.rand = random.SystemRandom()

    def _invalidate_state_caches(self, room_id, members_changed):
        """Invalidates caches that are based on the current state, but does
        not stream invalidations down replication.

        Args:
            room_id (str): Room where state changed
            members_changed (iterable[str]): The user_ids of members that have
                changed
        """
        for host in set(get_domain_from_id(u) for u in members_changed):
            self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
            self._attempt_to_invalidate_cache("was_host_joined", (room_id, host))

        self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
        self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
        self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))

    def _attempt_to_invalidate_cache(self, cache_name, key):
        """Attempts to invalidate the cache of the given name, ignoring if the
        cache doesn't exist. Mainly used for invalidating caches on workers,
        where they may not have the cache.

        Args:
            cache_name (str)
            key (tuple)
        """
        try:
            getattr(self, cache_name).invalidate(key)
        except AttributeError:
            # We probably haven't pulled in the cache in this worker,
            # which is fine.
            pass


def db_to_json(db_content):
    """
    Take some data from a database row and return a JSON-decoded object.

    Args:
        db_content (memoryview|buffer|bytes|bytearray|unicode)
    """
    # psycopg2 on Python 3 returns memoryview objects, which we need to
    # cast to bytes to decode
    if isinstance(db_content, memoryview):
        db_content = db_content.tobytes()

    # psycopg2 on Python 2 returns buffer objects, which we need to cast to
    # bytes to decode
    if PY2 and isinstance(db_content, builtins.buffer):
        db_content = bytes(db_content)

    # Decode it to a Unicode string before feeding it to json.loads, so we
    # consistenty get a Unicode-containing object out.
    if isinstance(db_content, (bytes, bytearray)):
        db_content = db_content.decode("utf8")

    try:
        return json.loads(db_content)
    except Exception:
        logging.warning("Tried to decode '%r' as JSON and failed", db_content)
        raise