From ed787cf09edd77e39ad9da0b957359214de85287 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Nov 2016 17:34:44 +0000 Subject: Hook up the send queue and create a federation sender worker --- synapse/replication/resource.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) (limited to 'synapse/replication/resource.py') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 5a14c51d23..a77312ae34 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -44,6 +44,7 @@ STREAM_NAMES = ( ("caches",), ("to_device",), ("public_rooms",), + ("federation",), ) @@ -116,8 +117,10 @@ class ReplicationResource(Resource): self.sources = hs.get_event_sources() self.presence_handler = hs.get_presence_handler() self.typing_handler = hs.get_typing_handler() + self.federation_sender = hs.get_federation_sender() self.notifier = hs.notifier self.clock = hs.get_clock() + self.config = hs.get_config() self.putChild("remove_pushers", PusherResource(hs)) self.putChild("syncing_users", PresenceResource(hs)) @@ -134,6 +137,7 @@ class ReplicationResource(Resource): pushers_token = self.store.get_pushers_stream_token() caches_token = self.store.get_cache_stream_token() public_rooms_token = self.store.get_current_public_room_stream_id() + federation_token = self.federation_sender.get_current_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -148,6 +152,7 @@ class ReplicationResource(Resource): caches_token, int(stream_token.to_device_key), int(public_rooms_token), + int(federation_token), )) @request_handler() @@ -202,6 +207,7 @@ class ReplicationResource(Resource): yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) yield self.public_rooms(writer, current_token, limit, request_streams) + self.federation(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) logger.debug("Replicated %d rows", writer.total) @@ -465,6 +471,23 @@ class ReplicationResource(Resource): "position", "room_id", "visibility" ), position=upto_token) + def federation(self, writer, current_token, limit, request_streams): + if self.config.send_federation: + return + + current_position = current_token.federation + + federation = request_streams.get("federation") + + if federation is not None and federation != current_position: + federation_rows = self.federation_sender.get_replication_rows( + federation, limit, + ) + upto_token = _position_from_rows(federation_rows, current_position) + writer.write_header_and_rows("federation", federation_rows, ( + "position", "type", "content", + ), position=upto_token) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -497,6 +520,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", "push_rules", "pushers", "state", "caches", "to_device", "public_rooms", + "federation", ))): __slots__ = [] -- cgit 1.4.1