summary refs log tree commit diff
path: root/synapse/api/streams/event.py
blob: a5c8b2b31f9344693c1d9532a5f2e7b7388d4df0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This module contains classes for streaming from the event stream: /events.
"""
from twisted.internet import defer

from synapse.api.errors import EventStreamError
from synapse.api.events import SynapseEvent
from synapse.api.events.room import (
    RoomMemberEvent, MessageEvent, FeedbackEvent, RoomTopicEvent
)
from synapse.api.streams import PaginationStream, StreamData

import logging

logger = logging.getLogger(__name__)


class EventsStreamData(StreamData):
    EVENT_TYPE = "EventsStream"

    def __init__(self, hs, room_id=None, feedback=False):
        super(EventsStreamData, self).__init__(hs)
        self.room_id = room_id
        self.with_feedback = feedback

    @defer.inlineCallbacks
    def get_rows(self, user_id, from_key, to_key, limit, direction):
        data, latest_ver = yield self.store.get_room_events(
            user_id=user_id,
            from_key=from_key,
            to_key=to_key,
            limit=limit,
            room_id=self.room_id,
            with_feedback=self.with_feedback
        )
        defer.returnValue((data, latest_ver))

    @defer.inlineCallbacks
    def max_token(self):
        val = yield self.store.get_room_events_max_id()
        defer.returnValue(val)


class EventStream(PaginationStream):

    SEPARATOR = '_'

    def __init__(self, user_id, stream_data_list):
        super(EventStream, self).__init__()
        self.user_id = user_id
        self.stream_data = stream_data_list

    @defer.inlineCallbacks
    def fix_tokens(self, pagination_config):
        pagination_config.from_tok = yield self.fix_token(
            pagination_config.from_tok)
        pagination_config.to_tok = yield self.fix_token(
            pagination_config.to_tok)

        if (
            not pagination_config.to_tok
            and pagination_config.direction == 'f'
        ):
            pagination_config.to_tok = yield self.get_current_max_token()

        logger.debug("pagination_config: %s", pagination_config)

        defer.returnValue(pagination_config)

    @defer.inlineCallbacks
    def fix_token(self, token):
        """Fixes unknown values in a token to known values.

        Args:
            token (str): The token to fix up.
        Returns:
            The fixed-up token, which may == token.
        """
        if token == PaginationStream.TOK_END:
            new_token = yield self.get_current_max_token()

            logger.debug("fix_token: From %s to %s", token, new_token)

            token = new_token

        defer.returnValue(token)

    @defer.inlineCallbacks
    def get_current_max_token(self):
        new_token_parts = []
        for s in self.stream_data:
            mx = yield s.max_token()
            new_token_parts.append(str(mx))

        new_token = EventStream.SEPARATOR.join(new_token_parts)

        logger.debug("get_current_max_token: %s", new_token)

        defer.returnValue(new_token)

    @defer.inlineCallbacks
    def get_chunk(self, config):
        # no support for limit on >1 streams, makes no sense.
        if config.limit and len(self.stream_data) > 1:
            raise EventStreamError(
                400, "Limit not supported on multiplexed streams."
            )

        chunk_data, next_tok = yield self._get_chunk_data(
            config.from_tok,
            config.to_tok,
            config.limit,
            config.direction,
        )

        defer.returnValue({
            "chunk": chunk_data,
            "start": config.from_tok,
            "end": next_tok
        })

    @defer.inlineCallbacks
    def _get_chunk_data(self, from_tok, to_tok, limit, direction):
        """ Get event data between the two tokens.

        Tokens are SEPARATOR separated values representing pkey values of
        certain tables, and the position determines the StreamData invoked
        according to the STREAM_DATA list.

        The magic value '-1' can be used to get the latest value.

        Args:
            from_tok - The token to start from.
            to_tok - The token to end at. Must have values > from_tok or be -1.
        Returns:
            A list of event data.
        Raises:
            EventStreamError if something went wrong.
        """
        # sanity check
        if to_tok is not None:
            if (from_tok.count(EventStream.SEPARATOR) !=
                    to_tok.count(EventStream.SEPARATOR) or
                    (from_tok.count(EventStream.SEPARATOR) + 1) !=
                    len(self.stream_data)):
                raise EventStreamError(400, "Token lengths don't match.")

        chunk = []
        next_ver = []
        for i, (from_pkey, to_pkey) in enumerate(zip(
            self._split_token(from_tok),
            self._split_token(to_tok)
        )):
            if from_pkey == to_pkey:
                # tokens are the same, we have nothing to do.
                next_ver.append(str(to_pkey))
                continue

            (event_chunk, max_pkey) = yield self.stream_data[i].get_rows(
                self.user_id, from_pkey, to_pkey, limit, direction,
            )

            chunk.extend([
                e.get_dict() if isinstance(e, SynapseEvent) else e
                for e in event_chunk
            ])
            next_ver.append(str(max_pkey))

        defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))

    def _split_token(self, token):
        """Splits the given token into a list of pkeys.

        Args:
            token (str): The token with SEPARATOR values.
        Returns:
            A list of ints.
        """
        if token:
            segments = token.split(EventStream.SEPARATOR)
        else:
            segments = [None] * len(self.stream_data)
        return segments