# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import heapq from typing import Iterable, Tuple, Type import attr from ._base import Stream, Token, db_query_to_update_function """Handling of the 'events' replication stream This stream contains rows of various types. Each row therefore contains a 'type' identifier before the real data. For example:: RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]] RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]] An "ev" row is sent for each new event. The fields in the data part are: * The new event id * The room id for the event * The type of the new event * The state key of the event, for state events * The event id of an event which is redacted by this event. A "state" row is sent whenever the "current state" in a room changes. The fields in the data part are: * The room id for the state change * The event type of the state which has changed * The state_key of the state which has changed * The event id of the new state """ @attr.s(slots=True, frozen=True) class EventsStreamRow(object): """A parsed row from the events replication stream""" type = attr.ib() # str: the TypeId of one of the *EventsStreamRows data = attr.ib() # BaseEventsStreamRow class BaseEventsStreamRow(object): """Base class for rows to be sent in the events stream. Specifies how to identify, serialize and deserialize the different types. """ # Unique string that ids the type. Must be overriden in sub classes. TypeId = None # type: str @classmethod def from_data(cls, data): """Parse the data from the replication stream into a row. By default we just call the constructor with the data list as arguments Args: data: The value of the data object from the replication stream """ return cls(*data) @attr.s(slots=True, frozen=True) class EventsStreamEventRow(BaseEventsStreamRow): TypeId = "ev" event_id = attr.ib() # str room_id = attr.ib() # str type = attr.ib() # str state_key = attr.ib() # str, optional redacts = attr.ib() # str, optional relates_to = attr.ib() # str, optional @attr.s(slots=True, frozen=True) class EventsStreamCurrentStateRow(BaseEventsStreamRow): TypeId = "state" room_id = attr.ib() # str type = attr.ib() # str state_key = attr.ib() # str event_id = attr.ib() # str, optional _EventRows = ( EventsStreamEventRow, EventsStreamCurrentStateRow, ) # type: Tuple[Type[BaseEventsStreamRow], ...] TypeToRow = {Row.TypeId: Row for Row in _EventRows} class EventsStream(Stream): """We received a new event, or an event went from being an outlier to not """ NAME = "events" def __init__(self, hs): self._store = hs.get_datastore() super().__init__( self._store.get_current_events_token, db_query_to_update_function(self._update_function), ) async def _update_function( self, from_token: Token, current_token: Token, limit: int ) -> Iterable[tuple]: event_rows = await self._store.get_all_new_forward_event_rows( from_token, current_token, limit ) event_updates = ( (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows ) state_rows = await self._store.get_all_updated_current_state_deltas( from_token, current_token, limit ) state_updates = ( (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows ) all_updates = heapq.merge(event_updates, state_updates) return all_updates @classmethod def parse_row(cls, row): (typ, data) = row data = TypeToRow[typ].from_data(data) return EventsStreamRow(typ, data)