summary refs log blame commit diff
path: root/synapse/federation/send_queue.py
blob: 657a93049766a0292f68d21446e012a26d8c33fc (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15













                                                                          














                                                                               
                      
 
                                                      

                                        
                            
                                  
 


                                    
 

                                                   
                                        
                                                    
 
                           
                                      
                                   
                                         
 








                                             
                                           

                                    
                                                                                
                                                                              









                                                                                 
 







                                                             
                                                                














                                                          
                                                               














































                                                                                         
                                            

                                                                          
            
                                                                 
                                     
                              





                                    
               
                                         
                                                            

                                
                                               
                                                 
                                     
                              
                                  


                                

                                                            
 
                                               
                                                 
                                     
                              
                                                        
                                               
 
                                                
                                     
                                               
                                               
 

                                




                                                                                     
             
                            



                                                                            

                                                 
                                 
 
                                                                                
                 
                                                                        
                                                        
 
                                           
                                           
                               
                                


                                                          


                                                 

                                            

                                                                           
 
                                                        


                                                           

                               

                                                        
                               
                                           

                                   

                                                                
                                                      


                                         
 
                                          

                                                                              
                                                  

                                        
 

                                


                                                                      




                                                                                       


                                                                


                                                                           


                                   



                                                                              











































































































































                                                                             
                                                                          

                                            
                                                                        
       
                                                                  











                                                                      


                                                                         


                                                



















                                                                     
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A federation sender that forwards things to be sent across replication to
a worker process.

It assumes there is a single worker process feeding off of it.

Each row in the replication stream consists of a type and some json, where the
types indicate whether they are presence, or edus, etc.

Ephemeral or non-event data are queued up in-memory. When the worker requests
updates since a particular point, all in-memory data since before that point is
dropped. We also expire things in the queue after 5 minutes, to ensure that a
dead worker doesn't cause the queues to grow limitlessly.

Events are replicated via a separate events stream.
"""

from .units import Edu

from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
import synapse.metrics

from blist import sorteddict
from collections import namedtuple

import logging

logger = logging.getLogger(__name__)


metrics = synapse.metrics.get_metrics_for(__name__)


class FederationRemoteSendQueue(object):
    """A drop in replacement for TransactionQueue"""

    def __init__(self, hs):
        self.server_name = hs.hostname
        self.clock = hs.get_clock()
        self.notifier = hs.get_notifier()

        self.presence_map = {}
        self.presence_changed = sorteddict()

        self.keyed_edu = {}
        self.keyed_edu_changed = sorteddict()

        self.edus = sorteddict()

        self.failures = sorteddict()

        self.device_messages = sorteddict()

        self.pos = 1
        self.pos_time = sorteddict()

        # EVERYTHING IS SAD. In particular, python only makes new scopes when
        # we make a new function, so we need to make a new function so the inner
        # lambda binds to the queue rather than to the name of the queue which
        # changes. ARGH.
        def register(name, queue):
            metrics.register_callback(
                queue_name + "_size",
                lambda: len(queue),
            )

        for queue_name in [
            "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
            "edus", "failures", "device_messages", "pos_time",
        ]:
            register(queue_name, getattr(self, queue_name))

        self.clock.looping_call(self._clear_queue, 30 * 1000)

    def _next_pos(self):
        pos = self.pos
        self.pos += 1
        self.pos_time[self.clock.time_msec()] = pos
        return pos

    def _clear_queue(self):
        """Clear the queues for anything older than N minutes"""

        FIVE_MINUTES_AGO = 5 * 60 * 1000
        now = self.clock.time_msec()

        keys = self.pos_time.keys()
        time = keys.bisect_left(now - FIVE_MINUTES_AGO)
        if not keys[:time]:
            return

        position_to_delete = max(keys[:time])
        for key in keys[:time]:
            del self.pos_time[key]

        self._clear_queue_before_pos(position_to_delete)

    def _clear_queue_before_pos(self, position_to_delete):
        """Clear all the queues from before a given position"""
        with Measure(self.clock, "send_queue._clear"):
            # Delete things out of presence maps
            keys = self.presence_changed.keys()
            i = keys.bisect_left(position_to_delete)
            for key in keys[:i]:
                del self.presence_changed[key]

            user_ids = set(
                user_id for uids in self.presence_changed.values() for _, user_id in uids
            )

            to_del = [
                user_id for user_id in self.presence_map if user_id not in user_ids
            ]
            for user_id in to_del:
                del self.presence_map[user_id]

            # Delete things out of keyed edus
            keys = self.keyed_edu_changed.keys()
            i = keys.bisect_left(position_to_delete)
            for key in keys[:i]:
                del self.keyed_edu_changed[key]

            live_keys = set()
            for edu_key in self.keyed_edu_changed.values():
                live_keys.add(edu_key)

            to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
            for edu_key in to_del:
                del self.keyed_edu[edu_key]

            # Delete things out of edu map
            keys = self.edus.keys()
            i = keys.bisect_left(position_to_delete)
            for key in keys[:i]:
                del self.edus[key]

            # Delete things out of failure map
            keys = self.failures.keys()
            i = keys.bisect_left(position_to_delete)
            for key in keys[:i]:
                del self.failures[key]

            # Delete things out of device map
            keys = self.device_messages.keys()
            i = keys.bisect_left(position_to_delete)
            for key in keys[:i]:
                del self.device_messages[key]

    def notify_new_events(self, current_id):
        """As per TransactionQueue"""
        # We don't need to replicate this as it gets sent down a different
        # stream.
        pass

    def send_edu(self, destination, edu_type, content, key=None):
        """As per TransactionQueue"""
        pos = self._next_pos()

        edu = Edu(
            origin=self.server_name,
            destination=destination,
            edu_type=edu_type,
            content=content,
        )

        if key:
            assert isinstance(key, tuple)
            self.keyed_edu[(destination, key)] = edu
            self.keyed_edu_changed[pos] = (destination, key)
        else:
            self.edus[pos] = edu

        self.notifier.on_new_replication_data()

    def send_presence(self, destination, states):
        """As per TransactionQueue"""
        pos = self._next_pos()

        self.presence_map.update({
            state.user_id: state
            for state in states
        })

        self.presence_changed[pos] = [
            (destination, state.user_id) for state in states
        ]

        self.notifier.on_new_replication_data()

    def send_failure(self, failure, destination):
        """As per TransactionQueue"""
        pos = self._next_pos()

        self.failures[pos] = (destination, str(failure))
        self.notifier.on_new_replication_data()

    def send_device_messages(self, destination):
        """As per TransactionQueue"""
        pos = self._next_pos()
        self.device_messages[pos] = destination
        self.notifier.on_new_replication_data()

    def get_current_token(self):
        return self.pos - 1

    def federation_ack(self, token):
        self._clear_queue_before_pos(token)

    def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
        """Get rows to be sent over federation between the two tokens

        Args:
            from_token (int)
            to_token(int)
            limit (int)
            federation_ack (int): Optional. The position where the worker is
                explicitly acknowledged it has handled. Allows us to drop
                data from before that point
        """
        # TODO: Handle limit.

        # To handle restarts where we wrap around
        if from_token > self.pos:
            from_token = -1

        # list of tuple(int, BaseFederationRow), where the first is the position
        # of the federation stream.
        rows = []

        # There should be only one reader, so lets delete everything its
        # acknowledged its seen.
        if federation_ack:
            self._clear_queue_before_pos(federation_ack)

        # Fetch changed presence
        keys = self.presence_changed.keys()
        i = keys.bisect_right(from_token)
        j = keys.bisect_right(to_token) + 1
        dest_user_ids = set(
            (pos, dest_user_id)
            for pos in keys[i:j]
            for dest_user_id in self.presence_changed[pos]
        )

        for (key, (dest, user_id)) in dest_user_ids:
            rows.append((key, PresenceRow(
                destination=dest,
                state=self.presence_map[user_id],
            )))

        # Fetch changes keyed edus
        keys = self.keyed_edu_changed.keys()
        i = keys.bisect_right(from_token)
        j = keys.bisect_right(to_token) + 1
        keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])

        for (pos, (destination, edu_key)) in keyed_edus:
            rows.append((pos, KeyedEduRow(
                key=edu_key,
                edu=self.keyed_edu[(destination, edu_key)],
            )))

        # Fetch changed edus
        keys = self.edus.keys()
        i = keys.bisect_right(from_token)
        j = keys.bisect_right(to_token) + 1
        edus = set((k, self.edus[k]) for k in keys[i:j])

        for (pos, edu) in edus:
            rows.append((pos, EduRow(edu)))

        # Fetch changed failures
        keys = self.failures.keys()
        i = keys.bisect_right(from_token)
        j = keys.bisect_right(to_token) + 1
        failures = set((k, self.failures[k]) for k in keys[i:j])

        for (pos, (destination, failure)) in failures:
            rows.append((pos, FailureRow(
                destination=destination,
                failure=failure,
            )))

        # Fetch changed device messages
        keys = self.device_messages.keys()
        i = keys.bisect_right(from_token)
        j = keys.bisect_right(to_token) + 1
        device_messages = set((k, self.device_messages[k]) for k in keys[i:j])

        for (pos, destination) in device_messages:
            rows.append((pos, DeviceRow(
                destination=destination,
            )))

        # Sort rows based on pos
        rows.sort()

        return [(pos, row.TypeId, row.to_data()) for pos, row in rows]


class BaseFederationRow(object):
    """Base class for rows to be sent in the federation stream.

    Specifies how to identify, serialize and deserialize the different types.
    """

    TypeId = None  # Unique string that ids the type. Must be overriden in sub classes.

    @staticmethod
    def from_data(data):
        """Parse the data from the federation stream into a row.

        Args:
            data: The value of ``data`` from FederationStreamRow.data, type
                depends on the type of stream
        """
        raise NotImplementedError()

    def to_data(self):
        """Serialize this row to be sent over the federation stream.

        Returns:
            The value to be sent in FederationStreamRow.data. The type depends
            on the type of stream.
        """
        raise NotImplementedError()

    def add_to_buffer(self, buff):
        """Add this row to the appropriate field in the buffer ready for this
        to be sent over federation.

        We use a buffer so that we can batch up events that have come in at
        the same time and send them all at once.

        Args:
            buff (BufferedToSend)
        """
        raise NotImplementedError()


class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
    "destination",  # str
    "state",  # UserPresenceState
))):
    TypeId = "p"

    @staticmethod
    def from_data(data):
        return PresenceRow(
            destination=data["destination"],
            state=UserPresenceState.from_dict(data["state"])
        )

    def to_data(self):
        return {
            "destination": self.destination,
            "state": self.state.as_dict()
        }

    def add_to_buffer(self, buff):
        buff.presence.setdefault(self.destination, []).append(self.state)


class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
    "key",  # tuple(str) - the edu key passed to send_edu
    "edu",  # Edu
))):
    TypeId = "k"

    @staticmethod
    def from_data(data):
        return KeyedEduRow(
            key=tuple(data["key"]),
            edu=Edu(**data["edu"]),
        )

    def to_data(self):
        return {
            "key": self.key,
            "edu": self.edu.get_internal_dict(),
        }

    def add_to_buffer(self, buff):
        buff.keyed_edus.setdefault(
            self.edu.destination, {}
        )[self.key] = self.edu


class EduRow(BaseFederationRow, namedtuple("EduRow", (
    "edu",  # Edu
))):
    TypeId = "e"

    @staticmethod
    def from_data(data):
        return EduRow(Edu(**data))

    def to_data(self):
        return self.edu.get_internal_dict()

    def add_to_buffer(self, buff):
        buff.edus.setdefault(self.edu.destination, []).append(self.edu)


class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
    "destination",  # str
    "failure",
))):
    TypeId = "f"

    @staticmethod
    def from_data(data):
        return FailureRow(
            destination=data["destination"],
            failure=data["failure"],
        )

    def to_data(self):
        return {
            "destination": self.destination,
            "failure": self.failure,
        }

    def add_to_buffer(self, buff):
        buff.failures.setdefault(self.destination, []).append(self.failure)


class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
    "destination",  # str
))):
    TypeId = "d"

    @staticmethod
    def from_data(data):
        return DeviceRow(destination=data)

    def to_data(self):
        return self.destination

    def add_to_buffer(self, buff):
        buff.device_destinations.add(self.destination)


TypeToRow = {
    Row.TypeId: Row
    for Row in (
        PresenceRow,
        KeyedEduRow,
        EduRow,
        FailureRow,
        DeviceRow,
    )
}


BufferedToSend = namedtuple("BufferedToSend", (
    "presence",  # dict of destination -> [UserPresenceState]
    "keyed_edus",  # dict of destination -> { key -> Edu }
    "edus",  # dict of destination -> [Edu]
    "failures",  # dict of destination -> [failures]
    "device_destinations",  # set of destinations
))


def process_rows_for_federation(federation_sender, rows):
    """Parse a list of rows from the federation stream and put them in the
    transaction queue ready for sending to the relevant homeservers.

    Args:
        federation_sender (TransactionQueue)
        rows (list(synapse.replication.tcp.streams.FederationStreamRow))
    """

    # The federation stream contains a bunch of different types of
    # rows that need to be handled differently. We parse the rows, put
    # them into the appropriate collection and then send them off.

    buff = BufferedToSend(
        presence={},
        keyed_edus={},
        edus={},
        failures={},
        device_destinations=set(),
    )

    # Parse the rows in the stream and add to the buffer
    for row in rows:
        if row.type not in TypeToRow:
            logger.error("Unrecognized federation row type %r", row.type)
            continue

        RowType = TypeToRow[row.type]
        parsed_row = RowType.from_data(row.data)
        parsed_row.add_to_buffer(buff)

    for destination, states in buff.presence.iteritems():
        federation_sender.send_presence(destination, states)

    for destination, edu_map in buff.keyed_edus.iteritems():
        for key, edu in edu_map.items():
            federation_sender.send_edu(
                edu.destination, edu.edu_type, edu.content, key=key,
            )

    for destination, edu_list in buff.edus.iteritems():
        for edu in edu_list:
            federation_sender.send_edu(
                edu.destination, edu.edu_type, edu.content, key=None,
            )

    for destination, failure_list in buff.failures.iteritems():
        for failure in failure_list:
            federation_sender.send_failure(destination, failure)

    for destination in buff.device_destinations:
        federation_sender.send_device_messages(destination)