From ed787cf09edd77e39ad9da0b957359214de85287 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Nov 2016 17:34:44 +0000 Subject: Hook up the send queue and create a federation sender worker --- synapse/app/federation_sender.py | 302 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 synapse/app/federation_sender.py (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py new file mode 100644 index 0000000000..7a4fec4a66 --- /dev/null +++ b/synapse/app/federation_sender.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig +from synapse.crypto import context_factory +from synapse.http.site import SynapseSite +from synapse.federation import send_queue +from synapse.federation.units import Edu +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.storage.engines import create_engine +from synapse.storage.presence import UserPresenceState +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string + +from synapse import events + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc +import ujson as json + +logger = logging.getLogger("synapse.app.appservice") + + +class FederationSenderSlaveStore( + SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, + SlavedRegistrationStore, +): + pass + + +class FederationSenderServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse federation_sender now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + send_handler = self._get_send_handler() + + def replicate(results): + stream = results.get("events") + if stream: + # max_stream_id = stream["position"] + # TODO + pass + + while True: + try: + args = store.stream_positions() + args.update(send_handler.stream_positions()) + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + send_handler.process_replication(result) + replicate(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(30) + + def _get_send_handler(self): + try: + return self._send_handler + except AttributeError: + self._send_handler = FederationSenderHandler(self) + return self._send_handler + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse federation sender", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.federation_sender" + + setup_logging(config.worker_log_config, config.worker_log_file) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + if config.send_federation: + sys.stderr.write( + "\nThe send_federation must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``send_federation: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.send_federation = True + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ps = FederationSenderServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ps.replicate() + ps.get_datastore().start_profiling() + ps.get_state_handler().start_caching() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-federation-sender", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +class FederationSenderHandler(object): + def __init__(self, hs): + self.federation_sender = hs.get_federation_sender() + + self._latest_room_serial = -1 + self._room_serials = {} + self._room_typing = {} + + def stream_positions(self): + # We must update this token from the response of the previous + # sync. In particular, the stream id may "reset" back to zero/a low + # value which we *must* use for the next replication request. + return {"federation": self._latest_room_serial} + + def process_replication(self, result): + stream = result.get("federation") + if stream: + self._latest_room_serial = int(stream["position"]) + + presence_to_send = {} + keyed_edus = {} + edus = {} + failures = {} + + for row in stream["rows"]: + position, typ, content_js = row + content = json.loads(content_js) + + if typ == send_queue.PRESENCE_TYPE: + destination = content["destination"] + state = UserPresenceState.from_dict(content["state"]) + + presence_to_send.setdefault(destination, []).append(state) + elif typ == send_queue.KEYED_EDU_TYPE: + key = content["key"] + edu = Edu(**content["edu"]) + + keyed_edus.setdefault(edu.destination, {})[key] = edu + elif typ == send_queue.EDU_TYPE: + edu = Edu(**content) + + edus.setdefault(edu.destination, []).append(edu) + elif typ == send_queue.FAILURE_TYPE: + destination = content["destination"] + failure = content["failure"] + + failures.setdefault(destination, []).append(failure) + else: + raise Exception("Unrecognised federation type: %r", typ) + + for destination, states in presence_to_send.items(): + self.federation_sender.send_presence(destination, states) + + for destination, edu_map in keyed_edus.items(): + for key, edu in edu_map.items(): + self.federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=key, + ) + + for destination, edu_list in edus.items(): + for edu in edu_list: + self.federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=None, + ) + + for destination, failure_list in failures.items(): + for failure in failure_list: + self.federation_sender.send_failure(destination, failure) + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) -- cgit 1.4.1 From f8ee66250a16cb9dd3af01fb1150ff18cfebbc39 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Nov 2016 15:46:44 +0000 Subject: Handle sending events and device messages over federation --- synapse/app/federation_sender.py | 31 ++++++++++-------- synapse/federation/send_queue.py | 38 ++++++++++++++++++---- synapse/federation/transaction_queue.py | 32 ++++++++++++++++++ synapse/handlers/message.py | 13 +------- synapse/notifier.py | 2 ++ synapse/replication/resource.py | 2 +- synapse/replication/slave/storage/deviceinbox.py | 15 ++++++--- synapse/replication/slave/storage/events.py | 11 +++++++ synapse/replication/slave/storage/transactions.py | 4 +-- synapse/storage/deviceinbox.py | 26 ++++++++------- synapse/storage/prepare_database.py | 2 +- .../delta/39/device_federation_stream_idx.sql | 16 +++++++++ synapse/storage/stream.py | 31 ++++++++++++++++++ synapse/util/jsonobject.py | 17 ++++++++-- 14 files changed, 185 insertions(+), 55 deletions(-) create mode 100644 synapse/storage/schema/delta/39/device_federation_stream_idx.sql (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 7a4fec4a66..32113c175c 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -127,13 +127,6 @@ class FederationSenderServer(HomeServer): replication_url = self.config.worker_replication_url send_handler = self._get_send_handler() - def replicate(results): - stream = results.get("events") - if stream: - # max_stream_id = stream["position"] - # TODO - pass - while True: try: args = store.stream_positions() @@ -142,7 +135,6 @@ class FederationSenderServer(HomeServer): result = yield http_client.get_json(replication_url, args=args) yield store.process_replication(result) send_handler.process_replication(result) - replicate(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(30) @@ -242,16 +234,17 @@ class FederationSenderHandler(object): return {"federation": self._latest_room_serial} def process_replication(self, result): - stream = result.get("federation") - if stream: - self._latest_room_serial = int(stream["position"]) + fed_stream = result.get("federation") + if fed_stream: + self._latest_room_serial = int(fed_stream["position"]) presence_to_send = {} keyed_edus = {} edus = {} failures = {} + device_destinations = set() - for row in stream["rows"]: + for row in fed_stream["rows"]: position, typ, content_js = row content = json.loads(content_js) @@ -264,7 +257,9 @@ class FederationSenderHandler(object): key = content["key"] edu = Edu(**content["edu"]) - keyed_edus.setdefault(edu.destination, {})[key] = edu + keyed_edus.setdefault( + edu.destination, {} + )[(edu.destination, tuple(key))] = edu elif typ == send_queue.EDU_TYPE: edu = Edu(**content) @@ -274,6 +269,8 @@ class FederationSenderHandler(object): failure = content["failure"] failures.setdefault(destination, []).append(failure) + elif typ == send_queue.DEVICE_MESSAGE_TYPE: + device_destinations.add(content["destination"]) else: raise Exception("Unrecognised federation type: %r", typ) @@ -296,6 +293,14 @@ class FederationSenderHandler(object): for failure in failure_list: self.federation_sender.send_failure(destination, failure) + for destination in device_destinations: + self.federation_sender.send_device_messages(destination) + + event_stream = result.get("events") + if event_stream: + latest_pos = event_stream["position"] + self.federation_sender.notify_new_events(latest_pos) + if __name__ == '__main__': with LoggingContext("main"): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index d439be050a..3fc625c4dd 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -23,11 +23,13 @@ PRESENCE_TYPE = "p" KEYED_EDU_TYPE = "k" EDU_TYPE = "e" FAILURE_TYPE = "f" +DEVICE_MESSAGE_TYPE = "d" class FederationRemoteSendQueue(object): def __init__(self, hs): + self.server_name = hs.hostname self.clock = hs.get_clock() # TODO: Add metrics for size of lists below @@ -45,6 +47,8 @@ class FederationRemoteSendQueue(object): self.pos = 1 self.pos_time = sorteddict() + self.device_messages = sorteddict() + self.clock.looping_call(self._clear_queue, 30 * 1000) def _next_pos(self): @@ -111,6 +115,15 @@ class FederationRemoteSendQueue(object): for key in keys[:i]: del self.failures[key] + # Delete things out of device map + keys = self.device_messages.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.device_messages[key] + + def notify_new_events(self, current_id): + pass + def send_edu(self, destination, edu_type, content, key=None): pos = self._next_pos() @@ -122,6 +135,7 @@ class FederationRemoteSendQueue(object): ) if key: + assert isinstance(key, tuple) self.keyed_edu[(destination, key)] = edu self.keyed_edu_changed[pos] = (destination, key) else: @@ -148,9 +162,9 @@ class FederationRemoteSendQueue(object): # This gets sent down a separate path pass - def notify_new_device_message(self, destination): - # TODO - pass + def send_device_messages(self, destination): + pos = self._next_pos() + self.device_messages[pos] = destination def get_current_token(self): return self.pos - 1 @@ -188,11 +202,11 @@ class FederationRemoteSendQueue(object): i = keys.bisect_right(token) keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) - for (pos, edu_key) in keyed_edus: + for (pos, (destination, edu_key)) in keyed_edus: rows.append( (pos, KEYED_EDU_TYPE, ujson.dumps({ "key": edu_key, - "edu": self.keyed_edu[edu_key].get_dict(), + "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), })) ) @@ -202,7 +216,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_dict()))) + rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) # Fetch changed failures keys = self.failures.keys() @@ -210,11 +224,21 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:]) for (pos, (destination, failure)) in failures: - rows.append((pos, None, FAILURE_TYPE, ujson.dumps({ + rows.append((pos, FAILURE_TYPE, ujson.dumps({ "destination": destination, "failure": failure, }))) + # Fetch changed device messages + keys = self.device_messages.keys() + i = keys.bisect_right(token) + device_messages = set((k, self.device_messages[k]) for k in keys[i:]) + + for (pos, destination) in device_messages: + rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ + "destination": destination, + }))) + # Sort rows based on pos rows.sort() diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5d4f244377..aa664beead 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,7 @@ from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) from synapse.util.metrics import measure_func +from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state import synapse.metrics @@ -54,6 +55,7 @@ class TransactionQueue(object): self.server_name = hs.hostname self.store = hs.get_datastore() + self.state = hs.get_state_handler() self.transaction_actions = TransactionActions(self.store) self.transport_layer = hs.get_federation_transport_client() @@ -103,6 +105,9 @@ class TransactionQueue(object): self._order = 1 + self._is_processing = False + self._last_token = 0 + def can_send_to(self, destination): """Can we send messages to the given server? @@ -123,6 +128,33 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") + @defer.inlineCallbacks + def notify_new_events(self, current_id): + if self._is_processing: + return + + try: + self._is_processing = True + while True: + self._last_token, events = yield self.store.get_all_new_events_stream( + self._last_token, current_id, limit=20, + ) + + if not events: + break + + for event in events: + users_in_room = yield self.state.get_current_user_in_room( + event.room_id, latest_event_ids=[event.event_id], + ) + + destinations = [ + get_domain_from_id(user_id) for user_id in users_in_room + ] + self.send_pdu(event, destinations) + finally: + self._is_processing = False + def send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 81df45177a..fd09397226 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,7 +22,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.push.action_generator import ActionGenerator from synapse.types import ( - UserID, RoomAlias, RoomStreamToken, get_domain_from_id + UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock from synapse.util.logcontext import preserve_fn @@ -599,13 +599,6 @@ class MessageHandler(BaseHandler): event_stream_id, max_stream_id ) - users_in_room = yield self.store.get_joined_users_from_context(event, context) - - destinations = [ - get_domain_from_id(user_id) for user_id in users_in_room - if not self.hs.is_mine_id(user_id) - ] - @defer.inlineCallbacks def _notify(): yield run_on_reactor() @@ -618,7 +611,3 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - - preserve_fn(federation_handler.handle_new_event)( - event, destinations=destinations, - ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 48653ae843..d528d1c1e0 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -143,6 +143,7 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self.federation_sender = hs.get_federation_sender() self.state_handler = hs.get_state_handler() self.clock.looping_call( @@ -219,6 +220,7 @@ class Notifier(object): """Notify any user streams that are interested in this room event""" # poke any interested application service. self.appservice_handler.notify_interested_services(room_stream_id) + self.federation_sender.notify_new_events(room_stream_id) if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index a77312ae34..e708811326 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -453,7 +453,7 @@ class ReplicationResource(Resource): ) upto_token = _position_from_rows(to_device_rows, current_position) writer.write_header_and_rows("to_device", to_device_rows, ( - "position", "user_id", "device_id", "message_json" + "position", "entity", ), position=upto_token) @defer.inlineCallbacks diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 373212d42d..cc860f9f9b 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -38,6 +38,7 @@ class SlavedDeviceInboxStore(BaseSlavedStore): get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__ delete_messages_for_device = DataStore.delete_messages_for_device.__func__ + delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__ def stream_positions(self): result = super(SlavedDeviceInboxStore, self).stream_positions() @@ -50,9 +51,15 @@ class SlavedDeviceInboxStore(BaseSlavedStore): self._device_inbox_id_gen.advance(int(stream["position"])) for row in stream["rows"]: stream_id = row[0] - user_id = row[1] - self._device_inbox_stream_cache.entity_has_changed( - user_id, stream_id - ) + entity = row[1] + + if entity.startswith("@"): + self._device_inbox_stream_cache.entity_has_changed( + entity, stream_id + ) + else: + self._device_federation_outbox_stream_cache.entity_has_changed( + entity, stream_id + ) return super(SlavedDeviceInboxStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 0c26e96e98..ef8713b55d 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -26,6 +26,11 @@ from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache import ujson as json +import logging + + +logger = logging.getLogger(__name__) + # So, um, we want to borrow a load of functions intended for reading from # a DataStore, but we don't want to take functions that either write to the @@ -180,6 +185,8 @@ class SlavedEventStore(BaseSlavedStore): EventFederationStore.__dict__["_get_forward_extremeties_for_room"] ) + get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() @@ -194,6 +201,10 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("events") if stream: self._stream_id_gen.advance(int(stream["position"])) + + if stream["rows"]: + logger.info("Got %d event rows", len(stream["rows"])) + for row in stream["rows"]: self._process_replication_row( row, backfilled=False, state_resets=state_resets diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index c459301b76..d92cea4ab1 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -25,8 +25,8 @@ class TransactionStore(BaseSlavedStore): ].orig _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ - def prep_send_transaction(self, transaction_id, destination, origin_server_ts): - return [] + prep_send_transaction = DataStore.prep_send_transaction.__func__ + delivered_txn = DataStore.delivered_txn.__func__ # For now, don't record the destination rety timings def set_destination_retry_timings(*args, **kwargs): diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index f640e73714..87398d60bc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore): return defer.succeed([]) def get_all_new_device_messages_txn(txn): + # We limit like this as we might have multiple rows per stream_id, and + # we want to make sure we always get all entries for any stream_id + # we return. + upper_pos = min(current_pos, last_pos + limit) sql = ( - "SELECT stream_id FROM device_inbox" + "SELECT stream_id, user_id" + " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" - " GROUP BY stream_id" " ORDER BY stream_id ASC" - " LIMIT ?" ) - txn.execute(sql, (last_pos, current_pos, limit)) - stream_ids = txn.fetchall() - if not stream_ids: - return [] - max_stream_id_in_limit = stream_ids[-1] + txn.execute(sql, (last_pos, upper_pos)) + rows = txn.fetchall() sql = ( - "SELECT stream_id, user_id, device_id, message_json" - " FROM device_inbox" + "SELECT stream_id, destination" + " FROM device_federation_outbox" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" ) - txn.execute(sql, (last_pos, max_stream_id_in_limit)) - return txn.fetchall() + txn.execute(sql, (last_pos, upper_pos)) + rows.extend(txn.fetchall()) + + return rows return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 6576a30098..e46ae6502e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 38 +SCHEMA_VERSION = 39 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql new file mode 100644 index 0000000000..00be801e90 --- /dev/null +++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 888b1cb35d..f34cb78f9a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -765,3 +765,34 @@ class StreamStore(SQLBaseStore): "token": end_token, }, } + + @defer.inlineCallbacks + def get_all_new_events_stream(self, from_id, current_id, limit): + """Get all new events""" + + def get_all_new_events_stream_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e" + " WHERE" + " ? < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (from_id, current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_all_new_events_stream", get_all_new_events_stream_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py index 3fd5c3d9fd..d668e5a6b8 100644 --- a/synapse/util/jsonobject.py +++ b/synapse/util/jsonobject.py @@ -76,15 +76,26 @@ class JsonEncodedObject(object): d.update(self.unrecognized_keys) return d + def get_internal_dict(self): + d = { + k: _encode(v, internal=True) for (k, v) in self.__dict__.items() + if k in self.valid_keys + } + d.update(self.unrecognized_keys) + return d + def __str__(self): return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__)) -def _encode(obj): +def _encode(obj, internal=False): if type(obj) is list: - return [_encode(o) for o in obj] + return [_encode(o, internal=internal) for o in obj] if isinstance(obj, JsonEncodedObject): - return obj.get_dict() + if internal: + return obj.get_internal_dict() + else: + return obj.get_dict() return obj -- cgit 1.4.1 From 7c9cdb22453d1a442e5c280149aeeff4d46da215 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Nov 2016 11:28:37 +0000 Subject: Store federation stream positions in the database --- synapse/app/federation_sender.py | 38 +++++++++++++--------- synapse/federation/transaction_queue.py | 21 +++++++++--- synapse/replication/slave/storage/events.py | 3 ++ synapse/storage/_base.py | 18 +++++++--- .../schema/delta/39/federation_out_position.sql | 22 +++++++++++++ synapse/storage/stream.py | 16 +++++++++ 6 files changed, 94 insertions(+), 24 deletions(-) create mode 100644 synapse/storage/schema/delta/39/federation_out_position.sql (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 32113c175c..6678667c35 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -125,27 +125,22 @@ class FederationSenderServer(HomeServer): http_client = self.get_simple_http_client() store = self.get_datastore() replication_url = self.config.worker_replication_url - send_handler = self._get_send_handler() + send_handler = FederationSenderHandler(self) + + send_handler.on_start() while True: try: args = store.stream_positions() - args.update(send_handler.stream_positions()) + args.update((yield send_handler.stream_positions())) args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) yield store.process_replication(result) - send_handler.process_replication(result) + yield send_handler.process_replication(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(30) - def _get_send_handler(self): - try: - return self._send_handler - except AttributeError: - self._send_handler = FederationSenderHandler(self) - return self._send_handler - def start(config_options): try: @@ -221,22 +216,29 @@ def start(config_options): class FederationSenderHandler(object): def __init__(self, hs): + self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() - self._latest_room_serial = -1 self._room_serials = {} self._room_typing = {} + def on_start(self): + # There may be some events that are persisted but haven't been sent, + # so send them now. + self.federation_sender.notify_new_events( + self.store.get_room_max_stream_ordering() + ) + + @defer.inlineCallbacks def stream_positions(self): - # We must update this token from the response of the previous - # sync. In particular, the stream id may "reset" back to zero/a low - # value which we *must* use for the next replication request. - return {"federation": self._latest_room_serial} + stream_id = yield self.store.get_federation_out_pos("federation") + defer.returnValue({"federation": stream_id}) + @defer.inlineCallbacks def process_replication(self, result): fed_stream = result.get("federation") if fed_stream: - self._latest_room_serial = int(fed_stream["position"]) + latest_id = int(fed_stream["position"]) presence_to_send = {} keyed_edus = {} @@ -296,6 +298,10 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) + yield self.store.update_federation_out_pos( + "federation", latest_id + ) + event_stream = result.get("events") if event_stream: latest_pos = event_stream["position"] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index aa664beead..1b0ea070c2 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -106,7 +106,7 @@ class TransactionQueue(object): self._order = 1 self._is_processing = False - self._last_token = 0 + self._last_poked_id = -1 def can_send_to(self, destination): """Can we send messages to the given server? @@ -130,17 +130,22 @@ class TransactionQueue(object): @defer.inlineCallbacks def notify_new_events(self, current_id): + self._last_poked_id = max(current_id, self._last_poked_id) + if self._is_processing: return try: self._is_processing = True while True: - self._last_token, events = yield self.store.get_all_new_events_stream( - self._last_token, current_id, limit=20, + last_token = yield self.store.get_federation_out_pos("events") + next_token, events = yield self.store.get_all_new_events_stream( + last_token, self._last_poked_id, limit=20, ) - if not events: + logger.debug("Handling %s -> %s", last_token, next_token) + + if not events and next_token >= self._last_poked_id: break for event in events: @@ -151,7 +156,15 @@ class TransactionQueue(object): destinations = [ get_domain_from_id(user_id) for user_id in users_in_room ] + + logger.debug("Sending %s to %r", event, destinations) + self.send_pdu(event, destinations) + + yield self.store.update_federation_out_pos( + "events", next_token + ) + finally: self._is_processing = False diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index ef8713b55d..64f18bbb3e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -187,6 +187,9 @@ class SlavedEventStore(BaseSlavedStore): get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ + get_federation_out_pos = DataStore.get_federation_out_pos.__func__ + update_federation_out_pos = DataStore.update_federation_out_pos.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d828d6ee1d..d3686b9690 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -561,12 +561,17 @@ class SQLBaseStore(object): @staticmethod def _simple_select_onecol_txn(txn, table, keyvalues, retcol): + if keyvalues: + where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" + "SELECT %(retcol)s FROM %(table)s %(where)s" ) % { "retcol": retcol, "table": table, - "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()), + "where": where, } txn.execute(sql, keyvalues.values()) @@ -744,10 +749,15 @@ class SQLBaseStore(object): @staticmethod def _simple_update_one_txn(txn, table, keyvalues, updatevalues): - update_sql = "UPDATE %s SET %s WHERE %s" % ( + if keyvalues: + where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + + update_sql = "UPDATE %s SET %s %s" % ( table, ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) + where, ) txn.execute( diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql new file mode 100644 index 0000000000..edbd8e132f --- /dev/null +++ b/synapse/storage/schema/delta/39/federation_out_position.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + CREATE TABLE federation_stream_position( + type TEXT NOT NULL, + stream_id INTEGER NOT NULL + ); + + INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1); + INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f34cb78f9a..7fa63b58a7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -796,3 +796,19 @@ class StreamStore(SQLBaseStore): events = yield self._get_events(event_ids) defer.returnValue((upper_bound, events)) + + def get_federation_out_pos(self, typ): + return self._simple_select_one_onecol( + table="federation_stream_position", + retcol="stream_id", + keyvalues={"type": typ}, + desc="get_federation_out_pos" + ) + + def update_federation_out_pos(self, typ, stream_id): + return self._simple_update_one( + table="federation_stream_position", + keyvalues={"type": typ}, + updatevalues={"stream_id": stream_id}, + desc="update_federation_out_pos", + ) -- cgit 1.4.1 From 50934ce4604001898707f75179dd748884659f12 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Nov 2016 16:55:23 +0000 Subject: Comments --- synapse/app/federation_sender.py | 12 ++++++++++++ synapse/federation/send_queue.py | 26 ++++++++++++++++++++++++++ synapse/federation/transaction_queue.py | 3 +++ 3 files changed, 41 insertions(+) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 6678667c35..ba2b4c2615 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -215,6 +215,9 @@ def start(config_options): class FederationSenderHandler(object): + """Processes the replication stream and forwards the appropriate entries + to the federation sender. + """ def __init__(self, hs): self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() @@ -236,16 +239,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def process_replication(self, result): + # The federation stream contains things that we want to send out, e.g. + # presence, typing, etc. fed_stream = result.get("federation") if fed_stream: latest_id = int(fed_stream["position"]) + # The federation stream containis a bunch of different types of + # rows that need to be handled differently. We parse the rows, put + # them into the appropriate collection and then send them off. presence_to_send = {} keyed_edus = {} edus = {} failures = {} device_destinations = set() + # Parse the rows in the stream for row in fed_stream["rows"]: position, typ, content_js = row content = json.loads(content_js) @@ -276,6 +285,7 @@ class FederationSenderHandler(object): else: raise Exception("Unrecognised federation type: %r", typ) + # We've finished collecting, send everything off for destination, states in presence_to_send.items(): self.federation_sender.send_presence(destination, states) @@ -298,10 +308,12 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) + # Record where we are in the stream. yield self.store.update_federation_out_pos( "federation", latest_id ) + # We also need to poke the federation sender when new events happen event_stream = result.get("events") if event_stream: latest_pos = event_stream["position"] diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 99b5835780..98cf125cb5 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -13,6 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""A federation sender that forwards things to be sent across replication to +a worker process. + +It assumes there is a single worker process feeding off of it. + +Each row in the replication stream consists of a type and some json, where the +types indicate whether they are presence, or edus, etc. + +Ephemeral or non-event data are queued up in-memory. When the worker requests +updates since a particular point, all in-memory data since before that point is +dropped. We also expire things in the queue after 5 minutes, to ensure that a +dead worker doesn't cause the queues to grow limitlessly. + +Events are replicated via a separate events stream. +""" + from .units import Edu from blist import sorteddict @@ -27,6 +43,7 @@ DEVICE_MESSAGE_TYPE = "d" class FederationRemoteSendQueue(object): + """A drop in replacement for TransactionQueue""" def __init__(self, hs): self.server_name = hs.hostname @@ -58,6 +75,7 @@ class FederationRemoteSendQueue(object): return pos def _clear_queue(self): + """Clear the queues for anything older than N minutes""" # TODO measure this function time. FIVE_MINUTES_AGO = 5 * 60 * 1000 @@ -75,6 +93,7 @@ class FederationRemoteSendQueue(object): self._clear_queue_before_pos(position_to_delete) def _clear_queue_before_pos(self, position_to_delete): + """Clear all the queues from before a given position""" # Delete things out of presence maps keys = self.presence_changed.keys() i = keys.bisect_left(position_to_delete) @@ -122,9 +141,13 @@ class FederationRemoteSendQueue(object): del self.device_messages[key] def notify_new_events(self, current_id): + """As per TransactionQueue""" + # We don't need to replicate this as it gets sent down a different + # stream. pass def send_edu(self, destination, edu_type, content, key=None): + """As per TransactionQueue""" pos = self._next_pos() edu = Edu( @@ -142,6 +165,7 @@ class FederationRemoteSendQueue(object): self.edus[pos] = edu def send_presence(self, destination, states): + """As per TransactionQueue""" pos = self._next_pos() self.presence_map.update({ @@ -154,11 +178,13 @@ class FederationRemoteSendQueue(object): ] def send_failure(self, failure, destination): + """As per TransactionQueue""" pos = self._next_pos() self.failures[pos] = (destination, str(failure)) def send_device_messages(self, destination): + """As per TransactionQueue""" pos = self._next_pos() self.device_messages[pos] = destination diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 0b3fdc1067..c94c74a67e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -131,6 +131,9 @@ class TransactionQueue(object): @defer.inlineCallbacks def notify_new_events(self, current_id): + """This gets called when we have some new events we might want to + send out to other servers. + """ self._last_poked_id = max(current_id, self._last_poked_id) if self._is_processing: -- cgit 1.4.1 From 4c79a63fd76e982e5e60b22c7efd15b6e3cf9915 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Nov 2016 10:40:44 +0000 Subject: Explicit federation ack --- synapse/app/federation_sender.py | 5 ++++- synapse/federation/send_queue.py | 13 +++++++++++-- synapse/replication/resource.py | 15 ++++++++++----- 3 files changed, 25 insertions(+), 8 deletions(-) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index ba2b4c2615..dcdbe79a17 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -235,7 +235,10 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def stream_positions(self): stream_id = yield self.store.get_federation_out_pos("federation") - defer.returnValue({"federation": stream_id}) + defer.returnValue({ + "federation": stream_id, + "federation_ack": stream_id, + }) @defer.inlineCallbacks def process_replication(self, result): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index ed2b03fad4..5c9f7a86f0 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -213,7 +213,15 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def get_replication_rows(self, token, limit): + def get_replication_rows(self, token, limit, federation_ack=None): + """ + Args: + token (int) + limit (int) + federation_ack (int): Optional. The position where the worker is + explicitly acknowledged it has handled. Allows us to drop + data from before that point + """ # TODO: Handle limit. # To handle restarts where we wrap around @@ -224,7 +232,8 @@ class FederationRemoteSendQueue(object): # There should be only one reader, so lets delete everything its # acknowledged its seen. - self._clear_queue_before_pos(token) + if federation_ack: + self._clear_queue_before_pos(federation_ack) # Fetch changed presence keys = self.presence_changed.keys() diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index b05ca62710..cb9697e378 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -171,8 +171,13 @@ class ReplicationResource(Resource): } request_streams["streams"] = parse_string(request, "streams") + federation_ack = parse_integer(request, "federation_ack", None) + def replicate(): - return self.replicate(request_streams, limit) + return self.replicate( + request_streams, limit, + federation_ack=federation_ack + ) writer = yield self.notifier.wait_for_replication(replicate, timeout) result = writer.finish() @@ -190,7 +195,7 @@ class ReplicationResource(Resource): finish_request(request) @defer.inlineCallbacks - def replicate(self, request_streams, limit): + def replicate(self, request_streams, limit, federation_ack=None): writer = _Writer() current_token = yield self.current_replication_token() logger.debug("Replicating up to %r", current_token) @@ -209,7 +214,7 @@ class ReplicationResource(Resource): yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) yield self.public_rooms(writer, current_token, limit, request_streams) - self.federation(writer, current_token, limit, request_streams) + self.federation(writer, current_token, limit, request_streams, federation_ack) self.streams(writer, current_token, request_streams) logger.debug("Replicated %d rows", writer.total) @@ -473,7 +478,7 @@ class ReplicationResource(Resource): "position", "room_id", "visibility" ), position=upto_token) - def federation(self, writer, current_token, limit, request_streams): + def federation(self, writer, current_token, limit, request_streams, federation_ack): if self.config.send_federation: return @@ -483,7 +488,7 @@ class ReplicationResource(Resource): if federation is not None and federation != current_position: federation_rows = self.federation_sender.get_replication_rows( - federation, limit, + federation, limit, federation_ack=federation_ack, ) upto_token = _position_from_rows(federation_rows, current_position) writer.write_header_and_rows("federation", federation_rows, ( -- cgit 1.4.1 From 4d9b5c60f958320fb80f968a312eb83cf48258d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Nov 2016 11:11:41 +0000 Subject: Comment --- synapse/app/federation_sender.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index dcdbe79a17..80ea4c8062 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -237,6 +237,9 @@ class FederationSenderHandler(object): stream_id = yield self.store.get_federation_out_pos("federation") defer.returnValue({ "federation": stream_id, + + # Ack stuff we've "processed", this should only be called from + # one process. "federation_ack": stream_id, }) -- cgit 1.4.1