diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 32113c175c..6678667c35 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -125,27 +125,22 @@ class FederationSenderServer(HomeServer):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
- send_handler = self._get_send_handler()
+ send_handler = FederationSenderHandler(self)
+
+ send_handler.on_start()
while True:
try:
args = store.stream_positions()
- args.update(send_handler.stream_positions())
+ args.update((yield send_handler.stream_positions()))
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
- send_handler.process_replication(result)
+ yield send_handler.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(30)
- def _get_send_handler(self):
- try:
- return self._send_handler
- except AttributeError:
- self._send_handler = FederationSenderHandler(self)
- return self._send_handler
-
def start(config_options):
try:
@@ -221,22 +216,29 @@ def start(config_options):
class FederationSenderHandler(object):
def __init__(self, hs):
+ self.store = hs.get_datastore()
self.federation_sender = hs.get_federation_sender()
- self._latest_room_serial = -1
self._room_serials = {}
self._room_typing = {}
+ def on_start(self):
+ # There may be some events that are persisted but haven't been sent,
+ # so send them now.
+ self.federation_sender.notify_new_events(
+ self.store.get_room_max_stream_ordering()
+ )
+
+ @defer.inlineCallbacks
def stream_positions(self):
- # We must update this token from the response of the previous
- # sync. In particular, the stream id may "reset" back to zero/a low
- # value which we *must* use for the next replication request.
- return {"federation": self._latest_room_serial}
+ stream_id = yield self.store.get_federation_out_pos("federation")
+ defer.returnValue({"federation": stream_id})
+ @defer.inlineCallbacks
def process_replication(self, result):
fed_stream = result.get("federation")
if fed_stream:
- self._latest_room_serial = int(fed_stream["position"])
+ latest_id = int(fed_stream["position"])
presence_to_send = {}
keyed_edus = {}
@@ -296,6 +298,10 @@ class FederationSenderHandler(object):
for destination in device_destinations:
self.federation_sender.send_device_messages(destination)
+ yield self.store.update_federation_out_pos(
+ "federation", latest_id
+ )
+
event_stream = result.get("events")
if event_stream:
latest_pos = event_stream["position"]
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index aa664beead..1b0ea070c2 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -106,7 +106,7 @@ class TransactionQueue(object):
self._order = 1
self._is_processing = False
- self._last_token = 0
+ self._last_poked_id = -1
def can_send_to(self, destination):
"""Can we send messages to the given server?
@@ -130,17 +130,22 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def notify_new_events(self, current_id):
+ self._last_poked_id = max(current_id, self._last_poked_id)
+
if self._is_processing:
return
try:
self._is_processing = True
while True:
- self._last_token, events = yield self.store.get_all_new_events_stream(
- self._last_token, current_id, limit=20,
+ last_token = yield self.store.get_federation_out_pos("events")
+ next_token, events = yield self.store.get_all_new_events_stream(
+ last_token, self._last_poked_id, limit=20,
)
- if not events:
+ logger.debug("Handling %s -> %s", last_token, next_token)
+
+ if not events and next_token >= self._last_poked_id:
break
for event in events:
@@ -151,7 +156,15 @@ class TransactionQueue(object):
destinations = [
get_domain_from_id(user_id) for user_id in users_in_room
]
+
+ logger.debug("Sending %s to %r", event, destinations)
+
self.send_pdu(event, destinations)
+
+ yield self.store.update_federation_out_pos(
+ "events", next_token
+ )
+
finally:
self._is_processing = False
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index ef8713b55d..64f18bbb3e 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -187,6 +187,9 @@ class SlavedEventStore(BaseSlavedStore):
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
+ get_federation_out_pos = DataStore.get_federation_out_pos.__func__
+ update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d828d6ee1d..d3686b9690 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -561,12 +561,17 @@ class SQLBaseStore(object):
@staticmethod
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
+ if keyvalues:
+ where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+ else:
+ where = ""
+
sql = (
- "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
+ "SELECT %(retcol)s FROM %(table)s %(where)s"
) % {
"retcol": retcol,
"table": table,
- "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
+ "where": where,
}
txn.execute(sql, keyvalues.values())
@@ -744,10 +749,15 @@ class SQLBaseStore(object):
@staticmethod
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
- update_sql = "UPDATE %s SET %s WHERE %s" % (
+ if keyvalues:
+ where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+ else:
+ where = ""
+
+ update_sql = "UPDATE %s SET %s %s" % (
table,
", ".join("%s = ?" % (k,) for k in updatevalues),
- " AND ".join("%s = ?" % (k,) for k in keyvalues)
+ where,
)
txn.execute(
diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql
new file mode 100644
index 0000000000..edbd8e132f
--- /dev/null
+++ b/synapse/storage/schema/delta/39/federation_out_position.sql
@@ -0,0 +1,22 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE federation_stream_position(
+ type TEXT NOT NULL,
+ stream_id INTEGER NOT NULL
+ );
+
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index f34cb78f9a..7fa63b58a7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -796,3 +796,19 @@ class StreamStore(SQLBaseStore):
events = yield self._get_events(event_ids)
defer.returnValue((upper_bound, events))
+
+ def get_federation_out_pos(self, typ):
+ return self._simple_select_one_onecol(
+ table="federation_stream_position",
+ retcol="stream_id",
+ keyvalues={"type": typ},
+ desc="get_federation_out_pos"
+ )
+
+ def update_federation_out_pos(self, typ, stream_id):
+ return self._simple_update_one(
+ table="federation_stream_position",
+ keyvalues={"type": typ},
+ updatevalues={"stream_id": stream_id},
+ desc="update_federation_out_pos",
+ )
|