summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync/store.py
blob: d24fccf76f6d4e8ef61b47b18888e5ed015353a1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#

import logging
from typing import TYPE_CHECKING, Optional

import attr

from synapse.logging.opentracing import trace
from synapse.storage.databases.main import DataStore
from synapse.types import SlidingSyncStreamToken
from synapse.types.handlers.sliding_sync import (
    MutablePerConnectionState,
    PerConnectionState,
    SlidingSyncConfig,
)

if TYPE_CHECKING:
    pass

logger = logging.getLogger(__name__)


@attr.s(auto_attribs=True)
class SlidingSyncConnectionStore:
    """In-memory store of per-connection state, including what rooms we have
    previously sent down a sliding sync connection.

    Note: This is NOT safe to run in a worker setup because connection positions will
    point to different sets of rooms on different workers. e.g. for the same connection,
    a connection position of 5 might have totally different states on worker A and
    worker B.

    One complication that we need to deal with here is needing to handle requests being
    resent, i.e. if we sent down a room in a response that the client received, we must
    consider the room *not* sent when we get the request again.

    This is handled by using an integer "token", which is returned to the client
    as part of the sync token. For each connection we store a mapping from
    tokens to the room states, and create a new entry when we send down new
    rooms.

    Note that for any given sliding sync connection we will only store a maximum
    of two different tokens: the previous token from the request and a new token
    sent in the response. When we receive a request with a given token, we then
    clear out all other entries with a different token.

    Attributes:
        _connections: Mapping from `(user_id, conn_id)` to mapping of `token`
            to mapping of room ID to `HaveSentRoom`.
    """

    store: "DataStore"

    async def get_and_clear_connection_positions(
        self,
        sync_config: SlidingSyncConfig,
        from_token: Optional[SlidingSyncStreamToken],
    ) -> PerConnectionState:
        """Fetch the per-connection state for the token.

        Raises:
            SlidingSyncUnknownPosition if the connection_token is unknown
        """
        # If this is our first request, there is no previous connection state to fetch out of the database
        if from_token is None or from_token.connection_position == 0:
            return PerConnectionState()

        conn_id = sync_config.conn_id or ""

        device_id = sync_config.requester.device_id
        assert device_id is not None

        return await self.store.get_and_clear_connection_positions(
            sync_config.user.to_string(),
            device_id,
            conn_id,
            from_token.connection_position,
        )

    @trace
    async def record_new_state(
        self,
        sync_config: SlidingSyncConfig,
        from_token: Optional[SlidingSyncStreamToken],
        new_connection_state: MutablePerConnectionState,
    ) -> int:
        """Record updated per-connection state, returning the connection
        position associated with the new state.
        If there are no changes to the state this may return the same token as
        the existing per-connection state.
        """
        if not new_connection_state.has_updates():
            if from_token is not None:
                return from_token.connection_position
            else:
                return 0

        # A from token with a zero connection position means there was no
        # previously stored connection state, so we treat a zero the same as
        # there being no previous position.
        previous_connection_position = None
        if from_token is not None and from_token.connection_position != 0:
            previous_connection_position = from_token.connection_position

        conn_id = sync_config.conn_id or ""

        device_id = sync_config.requester.device_id
        assert device_id is not None

        return await self.store.persist_per_connection_state(
            sync_config.user.to_string(),
            device_id,
            conn_id,
            previous_connection_position,
            new_connection_state,
        )