From 7c9cdb22453d1a442e5c280149aeeff4d46da215 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Nov 2016 11:28:37 +0000 Subject: Store federation stream positions in the database --- synapse/app/federation_sender.py | 38 +++++++++++++--------- synapse/federation/transaction_queue.py | 21 +++++++++--- synapse/replication/slave/storage/events.py | 3 ++ synapse/storage/_base.py | 18 +++++++--- .../schema/delta/39/federation_out_position.sql | 22 +++++++++++++ synapse/storage/stream.py | 16 +++++++++ 6 files changed, 94 insertions(+), 24 deletions(-) create mode 100644 synapse/storage/schema/delta/39/federation_out_position.sql (limited to 'synapse') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 32113c175c..6678667c35 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -125,27 +125,22 @@ class FederationSenderServer(HomeServer): http_client = self.get_simple_http_client() store = self.get_datastore() replication_url = self.config.worker_replication_url - send_handler = self._get_send_handler() + send_handler = FederationSenderHandler(self) + + send_handler.on_start() while True: try: args = store.stream_positions() - args.update(send_handler.stream_positions()) + args.update((yield send_handler.stream_positions())) args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) yield store.process_replication(result) - send_handler.process_replication(result) + yield send_handler.process_replication(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(30) - def _get_send_handler(self): - try: - return self._send_handler - except AttributeError: - self._send_handler = FederationSenderHandler(self) - return self._send_handler - def start(config_options): try: @@ -221,22 +216,29 @@ def start(config_options): class FederationSenderHandler(object): def __init__(self, hs): + self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() - self._latest_room_serial = -1 self._room_serials = {} self._room_typing = {} + def on_start(self): + # There may be some events that are persisted but haven't been sent, + # so send them now. + self.federation_sender.notify_new_events( + self.store.get_room_max_stream_ordering() + ) + + @defer.inlineCallbacks def stream_positions(self): - # We must update this token from the response of the previous - # sync. In particular, the stream id may "reset" back to zero/a low - # value which we *must* use for the next replication request. - return {"federation": self._latest_room_serial} + stream_id = yield self.store.get_federation_out_pos("federation") + defer.returnValue({"federation": stream_id}) + @defer.inlineCallbacks def process_replication(self, result): fed_stream = result.get("federation") if fed_stream: - self._latest_room_serial = int(fed_stream["position"]) + latest_id = int(fed_stream["position"]) presence_to_send = {} keyed_edus = {} @@ -296,6 +298,10 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) + yield self.store.update_federation_out_pos( + "federation", latest_id + ) + event_stream = result.get("events") if event_stream: latest_pos = event_stream["position"] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index aa664beead..1b0ea070c2 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -106,7 +106,7 @@ class TransactionQueue(object): self._order = 1 self._is_processing = False - self._last_token = 0 + self._last_poked_id = -1 def can_send_to(self, destination): """Can we send messages to the given server? @@ -130,17 +130,22 @@ class TransactionQueue(object): @defer.inlineCallbacks def notify_new_events(self, current_id): + self._last_poked_id = max(current_id, self._last_poked_id) + if self._is_processing: return try: self._is_processing = True while True: - self._last_token, events = yield self.store.get_all_new_events_stream( - self._last_token, current_id, limit=20, + last_token = yield self.store.get_federation_out_pos("events") + next_token, events = yield self.store.get_all_new_events_stream( + last_token, self._last_poked_id, limit=20, ) - if not events: + logger.debug("Handling %s -> %s", last_token, next_token) + + if not events and next_token >= self._last_poked_id: break for event in events: @@ -151,7 +156,15 @@ class TransactionQueue(object): destinations = [ get_domain_from_id(user_id) for user_id in users_in_room ] + + logger.debug("Sending %s to %r", event, destinations) + self.send_pdu(event, destinations) + + yield self.store.update_federation_out_pos( + "events", next_token + ) + finally: self._is_processing = False diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index ef8713b55d..64f18bbb3e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -187,6 +187,9 @@ class SlavedEventStore(BaseSlavedStore): get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ + get_federation_out_pos = DataStore.get_federation_out_pos.__func__ + update_federation_out_pos = DataStore.update_federation_out_pos.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d828d6ee1d..d3686b9690 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -561,12 +561,17 @@ class SQLBaseStore(object): @staticmethod def _simple_select_onecol_txn(txn, table, keyvalues, retcol): + if keyvalues: + where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" + "SELECT %(retcol)s FROM %(table)s %(where)s" ) % { "retcol": retcol, "table": table, - "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()), + "where": where, } txn.execute(sql, keyvalues.values()) @@ -744,10 +749,15 @@ class SQLBaseStore(object): @staticmethod def _simple_update_one_txn(txn, table, keyvalues, updatevalues): - update_sql = "UPDATE %s SET %s WHERE %s" % ( + if keyvalues: + where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + + update_sql = "UPDATE %s SET %s %s" % ( table, ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) + where, ) txn.execute( diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql new file mode 100644 index 0000000000..edbd8e132f --- /dev/null +++ b/synapse/storage/schema/delta/39/federation_out_position.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + CREATE TABLE federation_stream_position( + type TEXT NOT NULL, + stream_id INTEGER NOT NULL + ); + + INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1); + INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f34cb78f9a..7fa63b58a7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -796,3 +796,19 @@ class StreamStore(SQLBaseStore): events = yield self._get_events(event_ids) defer.returnValue((upper_bound, events)) + + def get_federation_out_pos(self, typ): + return self._simple_select_one_onecol( + table="federation_stream_position", + retcol="stream_id", + keyvalues={"type": typ}, + desc="get_federation_out_pos" + ) + + def update_federation_out_pos(self, typ, stream_id): + return self._simple_update_one( + table="federation_stream_position", + keyvalues={"type": typ}, + updatevalues={"stream_id": stream_id}, + desc="update_federation_out_pos", + ) -- cgit 1.4.1