summary refs log blame commit diff
path: root/synapse/replication/tcp/streams.py
blob: 4c60bf79f98942a3494c4dd749d63e70ca68139a (plain) (tree)




































                                                                                      




























































                                                           
                                                                               










                                                                 




                                                                    




                                                 























                                                                               





                                                                                    































































                                                                               
                                                                                






































































































































































































































                                                                                      













                                                                         











                                                           















                             
                                
                          
     
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines all the valid streams that clients can subscribe to, and the format
of the rows returned by each stream.

Each stream is defined by the following information:

    stream name:        The name of the stream
    row type:           The type that is used to serialise/deserialse the row
    current_token:      The function that returns the current token for the stream
    update_function:    The function that returns a list of updates between two tokens
"""

from twisted.internet import defer
from collections import namedtuple

import logging


logger = logging.getLogger(__name__)


MAX_EVENTS_BEHIND = 10000


EventStreamRow = namedtuple("EventStreamRow", (
    "event_id",  # str
    "room_id",  # str
    "type",  # str
    "state_key",  # str, optional
    "redacts",  # str, optional
))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
    "event_id",  # str
    "room_id",  # str
    "type",  # str
    "state_key",  # str, optional
    "redacts",  # str, optional
))
PresenceStreamRow = namedtuple("PresenceStreamRow", (
    "user_id",  # str
    "state",  # str
    "last_active_ts",  # int
    "last_federation_update_ts",  # int
    "last_user_sync_ts",  # int
    "status_msg",   # str
    "currently_active",  # bool
))
TypingStreamRow = namedtuple("TypingStreamRow", (
    "room_id",  # str
    "user_ids",  # list(str)
))
ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
    "room_id",  # str
    "receipt_type",  # str
    "user_id",  # str
    "event_id",  # str
    "data",  # dict
))
PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
    "user_id",  # str
))
PushersStreamRow = namedtuple("PushersStreamRow", (
    "user_id",  # str
    "app_id",  # str
    "pushkey",  # str
    "deleted",  # bool
))
CachesStreamRow = namedtuple("CachesStreamRow", (
    "cache_func",  # str
    "keys",  # list(str)
    "invalidation_ts",  # int
))
PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
    "room_id",  # str
    "visibility",  # str
    "appservice_id",  # str, optional
    "network_id",  # str, optional
))
DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
    "user_id",  # str
    "destination",  # str
))
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
    "entity",  # str
))
FederationStreamRow = namedtuple("FederationStreamRow", (
    "type",  # str, the type of data as defined in the BaseFederationRows
    "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
    "user_id",  # str
    "room_id",  # str
    "data",  # dict
))
AccountDataStreamRow = namedtuple("AccountDataStream", (
    "user_id",  # str
    "room_id",  # str
    "data_type",  # str
    "data",  # dict
))
CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
    "room_id",  # str
    "type",  # str
    "state_key",  # str
    "event_id",  # str, optional
))
GroupsStreamRow = namedtuple("GroupsStreamRow", (
    "group_id",  # str
    "user_id",  # str
    "type",  # str
    "content",  # dict
))


class Stream(object):
    """Base class for the streams.

    Provides a `get_updates()` function that returns new updates since the last
    time it was called up until the point `advance_current_token` was called.
    """
    NAME = None  # The name of the stream
    ROW_TYPE = None  # The type of the row
    _LIMITED = True  # Whether the update function takes a limit

    def __init__(self, hs):
        # The token from which we last asked for updates
        self.last_token = self.current_token()

        # The token that we will get updates up to
        self.upto_token = self.current_token()

    def advance_current_token(self):
        """Updates `upto_token` to "now", which updates up until which point
        get_updates[_since] will fetch rows till.
        """
        self.upto_token = self.current_token()

    def discard_updates_and_advance(self):
        """Called when the stream should advance but the updates would be discarded,
        e.g. when there are no currently connected workers.
        """
        self.upto_token = self.current_token()
        self.last_token = self.upto_token

    @defer.inlineCallbacks
    def get_updates(self):
        """Gets all updates since the last time this function was called (or
        since the stream was constructed if it hadn't been called before),
        until the `upto_token`

        Returns:
            (list(ROW_TYPE), int): list of updates plus the token used as an
                upper bound of the updates (i.e. the "current token")
        """
        updates, current_token = yield self.get_updates_since(self.last_token)
        self.last_token = current_token

        defer.returnValue((updates, current_token))

    @defer.inlineCallbacks
    def get_updates_since(self, from_token):
        """Like get_updates except allows specifying from when we should
        stream updates

        Returns:
            (list(ROW_TYPE), int): list of updates plus the token used as an
                upper bound of the updates (i.e. the "current token")
        """
        if from_token in ("NOW", "now"):
            defer.returnValue(([], self.upto_token))

        current_token = self.upto_token

        from_token = int(from_token)

        if from_token == current_token:
            defer.returnValue(([], current_token))

        if self._LIMITED:
            rows = yield self.update_function(
                from_token, current_token,
                limit=MAX_EVENTS_BEHIND + 1,
            )

            if len(rows) >= MAX_EVENTS_BEHIND:
                raise Exception("stream %s has fallen behined" % (self.NAME))
        else:
            rows = yield self.update_function(
                from_token, current_token,
            )

        updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]

        defer.returnValue((updates, current_token))

    def current_token(self):
        """Gets the current token of the underlying streams. Should be provided
        by the sub classes

        Returns:
            int
        """
        raise NotImplementedError()

    def update_function(self, from_token, current_token, limit=None):
        """Get updates between from_token and to_token. If Stream._LIMITED is
        True then limit is provided, otherwise it's not.

        Returns:
            Deferred(list(tuple)): the first entry in the tuple is the token for
                that update, and the rest of the tuple gets used to construct
                a ``ROW_TYPE`` instance
        """
        raise NotImplementedError()


class EventsStream(Stream):
    """We received a new event, or an event went from being an outlier to not
    """
    NAME = "events"
    ROW_TYPE = EventStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()
        self.current_token = store.get_current_events_token
        self.update_function = store.get_all_new_forward_event_rows

        super(EventsStream, self).__init__(hs)


class BackfillStream(Stream):
    """We fetched some old events and either we had never seen that event before
    or it went from being an outlier to not.
    """
    NAME = "backfill"
    ROW_TYPE = BackfillStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()
        self.current_token = store.get_current_backfill_token
        self.update_function = store.get_all_new_backfill_event_rows

        super(BackfillStream, self).__init__(hs)


class PresenceStream(Stream):
    NAME = "presence"
    _LIMITED = False
    ROW_TYPE = PresenceStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()
        presence_handler = hs.get_presence_handler()

        self.current_token = store.get_current_presence_token
        self.update_function = presence_handler.get_all_presence_updates

        super(PresenceStream, self).__init__(hs)


class TypingStream(Stream):
    NAME = "typing"
    _LIMITED = False
    ROW_TYPE = TypingStreamRow

    def __init__(self, hs):
        typing_handler = hs.get_typing_handler()

        self.current_token = typing_handler.get_current_token
        self.update_function = typing_handler.get_all_typing_updates

        super(TypingStream, self).__init__(hs)


class ReceiptsStream(Stream):
    NAME = "receipts"
    ROW_TYPE = ReceiptsStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()

        self.current_token = store.get_max_receipt_stream_id
        self.update_function = store.get_all_updated_receipts

        super(ReceiptsStream, self).__init__(hs)


class PushRulesStream(Stream):
    """A user has changed their push rules
    """
    NAME = "push_rules"
    ROW_TYPE = PushRulesStreamRow

    def __init__(self, hs):
        self.store = hs.get_datastore()
        super(PushRulesStream, self).__init__(hs)

    def current_token(self):
        push_rules_token, _ = self.store.get_push_rules_stream_token()
        return push_rules_token

    @defer.inlineCallbacks
    def update_function(self, from_token, to_token, limit):
        rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
        defer.returnValue([(row[0], row[2]) for row in rows])


class PushersStream(Stream):
    """A user has added/changed/removed a pusher
    """
    NAME = "pushers"
    ROW_TYPE = PushersStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()

        self.current_token = store.get_pushers_stream_token
        self.update_function = store.get_all_updated_pushers_rows

        super(PushersStream, self).__init__(hs)


class CachesStream(Stream):
    """A cache was invalidated on the master and no other stream would invalidate
    the cache on the workers
    """
    NAME = "caches"
    ROW_TYPE = CachesStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()

        self.current_token = store.get_cache_stream_token
        self.update_function = store.get_all_updated_caches

        super(CachesStream, self).__init__(hs)


class PublicRoomsStream(Stream):
    """The public rooms list changed
    """
    NAME = "public_rooms"
    ROW_TYPE = PublicRoomsStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()

        self.current_token = store.get_current_public_room_stream_id
        self.update_function = store.get_all_new_public_rooms

        super(PublicRoomsStream, self).__init__(hs)


class DeviceListsStream(Stream):
    """Someone added/changed/removed a device
    """
    NAME = "device_lists"
    _LIMITED = False
    ROW_TYPE = DeviceListsStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()

        self.current_token = store.get_device_stream_token
        self.update_function = store.get_all_device_list_changes_for_remotes

        super(DeviceListsStream, self).__init__(hs)


class ToDeviceStream(Stream):
    """New to_device messages for a client
    """
    NAME = "to_device"
    ROW_TYPE = ToDeviceStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()

        self.current_token = store.get_to_device_stream_token
        self.update_function = store.get_all_new_device_messages

        super(ToDeviceStream, self).__init__(hs)


class FederationStream(Stream):
    """Data to be sent over federation. Only available when master has federation
    sending disabled.
    """
    NAME = "federation"
    ROW_TYPE = FederationStreamRow

    def __init__(self, hs):
        federation_sender = hs.get_federation_sender()

        self.current_token = federation_sender.get_current_token
        self.update_function = federation_sender.get_replication_rows

        super(FederationStream, self).__init__(hs)


class TagAccountDataStream(Stream):
    """Someone added/removed a tag for a room
    """
    NAME = "tag_account_data"
    ROW_TYPE = TagAccountDataStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()

        self.current_token = store.get_max_account_data_stream_id
        self.update_function = store.get_all_updated_tags

        super(TagAccountDataStream, self).__init__(hs)


class AccountDataStream(Stream):
    """Global or per room account data was changed
    """
    NAME = "account_data"
    ROW_TYPE = AccountDataStreamRow

    def __init__(self, hs):
        self.store = hs.get_datastore()

        self.current_token = self.store.get_max_account_data_stream_id

        super(AccountDataStream, self).__init__(hs)

    @defer.inlineCallbacks
    def update_function(self, from_token, to_token, limit):
        global_results, room_results = yield self.store.get_all_updated_account_data(
            from_token, from_token, to_token, limit
        )

        results = list(room_results)
        results.extend(
            (stream_id, user_id, None, account_data_type, content,)
            for stream_id, user_id, account_data_type, content in global_results
        )

        defer.returnValue(results)


class CurrentStateDeltaStream(Stream):
    """Current state for a room was changed
    """
    NAME = "current_state_deltas"
    ROW_TYPE = CurrentStateDeltaStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()

        self.current_token = store.get_max_current_state_delta_stream_id
        self.update_function = store.get_all_updated_current_state_deltas

        super(CurrentStateDeltaStream, self).__init__(hs)


class GroupServerStream(Stream):
    NAME = "groups"
    ROW_TYPE = GroupsStreamRow

    def __init__(self, hs):
        store = hs.get_datastore()

        self.current_token = store.get_group_stream_token
        self.update_function = store.get_all_groups_changes

        super(GroupServerStream, self).__init__(hs)


STREAMS_MAP = {
    stream.NAME: stream
    for stream in (
        EventsStream,
        BackfillStream,
        PresenceStream,
        TypingStream,
        ReceiptsStream,
        PushRulesStream,
        PushersStream,
        CachesStream,
        PublicRoomsStream,
        DeviceListsStream,
        ToDeviceStream,
        FederationStream,
        TagAccountDataStream,
        AccountDataStream,
        CurrentStateDeltaStream,
        GroupServerStream,
    )
}