diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
new file mode 100644
index 0000000000..80ea4c8062
--- /dev/null
+++ b/synapse/app/federation_sender.py
@@ -0,0 +1,331 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.crypto import context_factory
+from synapse.http.site import SynapseSite
+from synapse.federation import send_queue
+from synapse.federation.units import Edu
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import UserPresenceState
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.appservice")
+
+
+class FederationSenderSlaveStore(
+ SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+ SlavedRegistrationStore,
+):
+ pass
+
+
+class FederationSenderServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse federation_sender now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.worker_replication_url
+ send_handler = FederationSenderHandler(self)
+
+ send_handler.on_start()
+
+ while True:
+ try:
+ args = store.stream_positions()
+ args.update((yield send_handler.stream_positions()))
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ yield store.process_replication(result)
+ yield send_handler.process_replication(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ yield sleep(30)
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse federation sender", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.federation_sender"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ if config.send_federation:
+ sys.stderr.write(
+ "\nThe send_federation must be disabled in the main synapse process"
+ "\nbefore they can be run in a separate worker."
+ "\nPlease add ``send_federation: false`` to the main config"
+ "\n"
+ )
+ sys.exit(1)
+
+ # Force the pushers to start since they will be disabled in the main config
+ config.send_federation = True
+
+ tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+ ps = FederationSenderServer(
+ config.server_name,
+ db_config=config.database_config,
+ tls_server_context_factory=tls_server_context_factory,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ps.setup()
+ ps.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
+
+ def start():
+ ps.replicate()
+ ps.get_datastore().start_profiling()
+ ps.get_state_handler().start_caching()
+
+ reactor.callWhenRunning(start)
+
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-federation-sender",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+class FederationSenderHandler(object):
+ """Processes the replication stream and forwards the appropriate entries
+ to the federation sender.
+ """
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.federation_sender = hs.get_federation_sender()
+
+ self._room_serials = {}
+ self._room_typing = {}
+
+ def on_start(self):
+ # There may be some events that are persisted but haven't been sent,
+ # so send them now.
+ self.federation_sender.notify_new_events(
+ self.store.get_room_max_stream_ordering()
+ )
+
+ @defer.inlineCallbacks
+ def stream_positions(self):
+ stream_id = yield self.store.get_federation_out_pos("federation")
+ defer.returnValue({
+ "federation": stream_id,
+
+ # Ack stuff we've "processed", this should only be called from
+ # one process.
+ "federation_ack": stream_id,
+ })
+
+ @defer.inlineCallbacks
+ def process_replication(self, result):
+ # The federation stream contains things that we want to send out, e.g.
+ # presence, typing, etc.
+ fed_stream = result.get("federation")
+ if fed_stream:
+ latest_id = int(fed_stream["position"])
+
+ # The federation stream containis a bunch of different types of
+ # rows that need to be handled differently. We parse the rows, put
+ # them into the appropriate collection and then send them off.
+ presence_to_send = {}
+ keyed_edus = {}
+ edus = {}
+ failures = {}
+ device_destinations = set()
+
+ # Parse the rows in the stream
+ for row in fed_stream["rows"]:
+ position, typ, content_js = row
+ content = json.loads(content_js)
+
+ if typ == send_queue.PRESENCE_TYPE:
+ destination = content["destination"]
+ state = UserPresenceState.from_dict(content["state"])
+
+ presence_to_send.setdefault(destination, []).append(state)
+ elif typ == send_queue.KEYED_EDU_TYPE:
+ key = content["key"]
+ edu = Edu(**content["edu"])
+
+ keyed_edus.setdefault(
+ edu.destination, {}
+ )[(edu.destination, tuple(key))] = edu
+ elif typ == send_queue.EDU_TYPE:
+ edu = Edu(**content)
+
+ edus.setdefault(edu.destination, []).append(edu)
+ elif typ == send_queue.FAILURE_TYPE:
+ destination = content["destination"]
+ failure = content["failure"]
+
+ failures.setdefault(destination, []).append(failure)
+ elif typ == send_queue.DEVICE_MESSAGE_TYPE:
+ device_destinations.add(content["destination"])
+ else:
+ raise Exception("Unrecognised federation type: %r", typ)
+
+ # We've finished collecting, send everything off
+ for destination, states in presence_to_send.items():
+ self.federation_sender.send_presence(destination, states)
+
+ for destination, edu_map in keyed_edus.items():
+ for key, edu in edu_map.items():
+ self.federation_sender.send_edu(
+ edu.destination, edu.edu_type, edu.content, key=key,
+ )
+
+ for destination, edu_list in edus.items():
+ for edu in edu_list:
+ self.federation_sender.send_edu(
+ edu.destination, edu.edu_type, edu.content, key=None,
+ )
+
+ for destination, failure_list in failures.items():
+ for failure in failure_list:
+ self.federation_sender.send_failure(destination, failure)
+
+ for destination in device_destinations:
+ self.federation_sender.send_device_messages(destination)
+
+ # Record where we are in the stream.
+ yield self.store.update_federation_out_pos(
+ "federation", latest_id
+ )
+
+ # We also need to poke the federation sender when new events happen
+ event_stream = result.get("events")
+ if event_stream:
+ latest_pos = event_stream["position"]
+ self.federation_sender.notify_new_events(latest_pos)
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/config/server.py b/synapse/config/server.py
index ed5417d0c3..634d8e6fe5 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -30,6 +30,11 @@ class ServerConfig(Config):
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
self.public_baseurl = config.get("public_baseurl")
+ # Whether to send federation traffic out in this process. This only
+ # applies to some federation traffic, and so shouldn't be used to
+ # "disable" federation
+ self.send_federation = config.get("send_federation", True)
+
if self.public_baseurl is not None:
if self.public_baseurl[-1] != '/':
self.public_baseurl += '/'
diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py
index 979fdf2431..2e32d245ba 100644
--- a/synapse/federation/__init__.py
+++ b/synapse/federation/__init__.py
@@ -17,10 +17,9 @@
"""
from .replication import ReplicationLayer
-from .transport.client import TransportLayerClient
-def initialize_http_replication(homeserver):
- transport = TransportLayerClient(homeserver)
+def initialize_http_replication(hs):
+ transport = hs.get_federation_transport_client()
- return ReplicationLayer(homeserver, transport)
+ return ReplicationLayer(hs, transport)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 94e76b1978..b255709165 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -18,7 +18,6 @@ from twisted.internet import defer
from .federation_base import FederationBase
from synapse.api.constants import Membership
-from .units import Edu
from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
@@ -45,10 +44,6 @@ logger = logging.getLogger(__name__)
# synapse.federation.federation_client is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
-sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
-
-sent_edus_counter = metrics.register_counter("sent_edus")
-
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
@@ -93,63 +88,6 @@ class FederationClient(FederationBase):
self._get_pdu_cache.start()
@log_function
- def send_pdu(self, pdu, destinations):
- """Informs the replication layer about a new PDU generated within the
- home server that should be transmitted to others.
-
- TODO: Figure out when we should actually resolve the deferred.
-
- Args:
- pdu (Pdu): The new Pdu.
-
- Returns:
- Deferred: Completes when we have successfully processed the PDU
- and replicated it to any interested remote home servers.
- """
- order = self._order
- self._order += 1
-
- sent_pdus_destination_dist.inc_by(len(destinations))
-
- logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
-
- # TODO, add errback, etc.
- self._transaction_queue.enqueue_pdu(pdu, destinations, order)
-
- logger.debug(
- "[%s] transaction_layer.enqueue_pdu... done",
- pdu.event_id
- )
-
- def send_presence(self, destination, states):
- if destination != self.server_name:
- self._transaction_queue.enqueue_presence(destination, states)
-
- @log_function
- def send_edu(self, destination, edu_type, content, key=None):
- edu = Edu(
- origin=self.server_name,
- destination=destination,
- edu_type=edu_type,
- content=content,
- )
-
- sent_edus_counter.inc()
-
- self._transaction_queue.enqueue_edu(edu, key=key)
-
- @log_function
- def send_device_messages(self, destination):
- """Sends the device messages in the local database to the remote
- destination"""
- self._transaction_queue.enqueue_device_messages(destination)
-
- @log_function
- def send_failure(self, failure, destination):
- self._transaction_queue.enqueue_failure(failure, destination)
- return defer.succeed(None)
-
- @log_function
def make_query(self, destination, query_type, args,
retry_on_dns_fail=False):
"""Sends a federation Query to a remote homeserver of the given type
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index ea66a5dcbc..62d865ec4b 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -20,8 +20,6 @@ a given transport.
from .federation_client import FederationClient
from .federation_server import FederationServer
-from .transaction_queue import TransactionQueue
-
from .persistence import TransactionActions
import logging
@@ -66,9 +64,6 @@ class ReplicationLayer(FederationClient, FederationServer):
self._clock = hs.get_clock()
self.transaction_actions = TransactionActions(self.store)
- self._transaction_queue = TransactionQueue(hs, transport_layer)
-
- self._order = 0
self.hs = hs
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
new file mode 100644
index 0000000000..5c9f7a86f0
--- /dev/null
+++ b/synapse/federation/send_queue.py
@@ -0,0 +1,298 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A federation sender that forwards things to be sent across replication to
+a worker process.
+
+It assumes there is a single worker process feeding off of it.
+
+Each row in the replication stream consists of a type and some json, where the
+types indicate whether they are presence, or edus, etc.
+
+Ephemeral or non-event data are queued up in-memory. When the worker requests
+updates since a particular point, all in-memory data since before that point is
+dropped. We also expire things in the queue after 5 minutes, to ensure that a
+dead worker doesn't cause the queues to grow limitlessly.
+
+Events are replicated via a separate events stream.
+"""
+
+from .units import Edu
+
+from synapse.util.metrics import Measure
+import synapse.metrics
+
+from blist import sorteddict
+import ujson
+
+
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+
+PRESENCE_TYPE = "p"
+KEYED_EDU_TYPE = "k"
+EDU_TYPE = "e"
+FAILURE_TYPE = "f"
+DEVICE_MESSAGE_TYPE = "d"
+
+
+class FederationRemoteSendQueue(object):
+ """A drop in replacement for TransactionQueue"""
+
+ def __init__(self, hs):
+ self.server_name = hs.hostname
+ self.clock = hs.get_clock()
+
+ self.presence_map = {}
+ self.presence_changed = sorteddict()
+
+ self.keyed_edu = {}
+ self.keyed_edu_changed = sorteddict()
+
+ self.edus = sorteddict()
+
+ self.failures = sorteddict()
+
+ self.device_messages = sorteddict()
+
+ self.pos = 1
+ self.pos_time = sorteddict()
+
+ # EVERYTHING IS SAD. In particular, python only makes new scopes when
+ # we make a new function, so we need to make a new function so the inner
+ # lambda binds to the queue rather than to the name of the queue which
+ # changes. ARGH.
+ def register(name, queue):
+ metrics.register_callback(
+ queue_name + "_size",
+ lambda: len(queue),
+ )
+
+ for queue_name in [
+ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
+ "edus", "failures", "device_messages", "pos_time",
+ ]:
+ register(queue_name, getattr(self, queue_name))
+
+ self.clock.looping_call(self._clear_queue, 30 * 1000)
+
+ def _next_pos(self):
+ pos = self.pos
+ self.pos += 1
+ self.pos_time[self.clock.time_msec()] = pos
+ return pos
+
+ def _clear_queue(self):
+ """Clear the queues for anything older than N minutes"""
+
+ FIVE_MINUTES_AGO = 5 * 60 * 1000
+ now = self.clock.time_msec()
+
+ keys = self.pos_time.keys()
+ time = keys.bisect_left(now - FIVE_MINUTES_AGO)
+ if not keys[:time]:
+ return
+
+ position_to_delete = max(keys[:time])
+ for key in keys[:time]:
+ del self.pos_time[key]
+
+ self._clear_queue_before_pos(position_to_delete)
+
+ def _clear_queue_before_pos(self, position_to_delete):
+ """Clear all the queues from before a given position"""
+ with Measure(self.clock, "send_queue._clear"):
+ # Delete things out of presence maps
+ keys = self.presence_changed.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.presence_changed[key]
+
+ user_ids = set(
+ user_id for uids in self.presence_changed.values() for _, user_id in uids
+ )
+
+ to_del = [
+ user_id for user_id in self.presence_map if user_id not in user_ids
+ ]
+ for user_id in to_del:
+ del self.presence_map[user_id]
+
+ # Delete things out of keyed edus
+ keys = self.keyed_edu_changed.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.keyed_edu_changed[key]
+
+ live_keys = set()
+ for edu_key in self.keyed_edu_changed.values():
+ live_keys.add(edu_key)
+
+ to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
+ for edu_key in to_del:
+ del self.keyed_edu[edu_key]
+
+ # Delete things out of edu map
+ keys = self.edus.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.edus[key]
+
+ # Delete things out of failure map
+ keys = self.failures.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.failures[key]
+
+ # Delete things out of device map
+ keys = self.device_messages.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.device_messages[key]
+
+ def notify_new_events(self, current_id):
+ """As per TransactionQueue"""
+ # We don't need to replicate this as it gets sent down a different
+ # stream.
+ pass
+
+ def send_edu(self, destination, edu_type, content, key=None):
+ """As per TransactionQueue"""
+ pos = self._next_pos()
+
+ edu = Edu(
+ origin=self.server_name,
+ destination=destination,
+ edu_type=edu_type,
+ content=content,
+ )
+
+ if key:
+ assert isinstance(key, tuple)
+ self.keyed_edu[(destination, key)] = edu
+ self.keyed_edu_changed[pos] = (destination, key)
+ else:
+ self.edus[pos] = edu
+
+ def send_presence(self, destination, states):
+ """As per TransactionQueue"""
+ pos = self._next_pos()
+
+ self.presence_map.update({
+ state.user_id: state
+ for state in states
+ })
+
+ self.presence_changed[pos] = [
+ (destination, state.user_id) for state in states
+ ]
+
+ def send_failure(self, failure, destination):
+ """As per TransactionQueue"""
+ pos = self._next_pos()
+
+ self.failures[pos] = (destination, str(failure))
+
+ def send_device_messages(self, destination):
+ """As per TransactionQueue"""
+ pos = self._next_pos()
+ self.device_messages[pos] = destination
+
+ def get_current_token(self):
+ return self.pos - 1
+
+ def get_replication_rows(self, token, limit, federation_ack=None):
+ """
+ Args:
+ token (int)
+ limit (int)
+ federation_ack (int): Optional. The position where the worker is
+ explicitly acknowledged it has handled. Allows us to drop
+ data from before that point
+ """
+ # TODO: Handle limit.
+
+ # To handle restarts where we wrap around
+ if token > self.pos:
+ token = -1
+
+ rows = []
+
+ # There should be only one reader, so lets delete everything its
+ # acknowledged its seen.
+ if federation_ack:
+ self._clear_queue_before_pos(federation_ack)
+
+ # Fetch changed presence
+ keys = self.presence_changed.keys()
+ i = keys.bisect_right(token)
+ dest_user_ids = set(
+ (pos, dest_user_id)
+ for pos in keys[i:]
+ for dest_user_id in self.presence_changed[pos]
+ )
+
+ for (key, (dest, user_id)) in dest_user_ids:
+ rows.append((key, PRESENCE_TYPE, ujson.dumps({
+ "destination": dest,
+ "state": self.presence_map[user_id].as_dict(),
+ })))
+
+ # Fetch changes keyed edus
+ keys = self.keyed_edu_changed.keys()
+ i = keys.bisect_right(token)
+ keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
+
+ for (pos, (destination, edu_key)) in keyed_edus:
+ rows.append(
+ (pos, KEYED_EDU_TYPE, ujson.dumps({
+ "key": edu_key,
+ "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
+ }))
+ )
+
+ # Fetch changed edus
+ keys = self.edus.keys()
+ i = keys.bisect_right(token)
+ edus = set((k, self.edus[k]) for k in keys[i:])
+
+ for (pos, edu) in edus:
+ rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
+
+ # Fetch changed failures
+ keys = self.failures.keys()
+ i = keys.bisect_right(token)
+ failures = set((k, self.failures[k]) for k in keys[i:])
+
+ for (pos, (destination, failure)) in failures:
+ rows.append((pos, FAILURE_TYPE, ujson.dumps({
+ "destination": destination,
+ "failure": failure,
+ })))
+
+ # Fetch changed device messages
+ keys = self.device_messages.keys()
+ i = keys.bisect_right(token)
+ device_messages = set((k, self.device_messages[k]) for k in keys[i:])
+
+ for (pos, destination) in device_messages:
+ rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
+ "destination": destination,
+ })))
+
+ # Sort rows based on pos
+ rows.sort()
+
+ return rows
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index f8ca93e4c3..c94c74a67e 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
from .persistence import TransactionActions
from .units import Transaction, Edu
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn
@@ -26,6 +27,7 @@ from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination,
)
from synapse.util.metrics import measure_func
+from synapse.types import get_domain_from_id
from synapse.handlers.presence import format_user_presence_state
import synapse.metrics
@@ -36,6 +38,12 @@ logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
+client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
+sent_pdus_destination_dist = client_metrics.register_distribution(
+ "sent_pdu_destinations"
+)
+sent_edus_counter = client_metrics.register_counter("sent_edus")
+
class TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at
@@ -44,13 +52,14 @@ class TransactionQueue(object):
It batches pending PDUs into single transactions.
"""
- def __init__(self, hs, transport_layer):
+ def __init__(self, hs):
self.server_name = hs.hostname
self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
self.transaction_actions = TransactionActions(self.store)
- self.transport_layer = transport_layer
+ self.transport_layer = hs.get_federation_transport_client()
self.clock = hs.get_clock()
@@ -95,6 +104,11 @@ class TransactionQueue(object):
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
+ self._order = 1
+
+ self._is_processing = False
+ self._last_poked_id = -1
+
def can_send_to(self, destination):
"""Can we send messages to the given server?
@@ -115,11 +129,61 @@ class TransactionQueue(object):
else:
return not destination.startswith("localhost")
- def enqueue_pdu(self, pdu, destinations, order):
+ @defer.inlineCallbacks
+ def notify_new_events(self, current_id):
+ """This gets called when we have some new events we might want to
+ send out to other servers.
+ """
+ self._last_poked_id = max(current_id, self._last_poked_id)
+
+ if self._is_processing:
+ return
+
+ try:
+ self._is_processing = True
+ while True:
+ last_token = yield self.store.get_federation_out_pos("events")
+ next_token, events = yield self.store.get_all_new_events_stream(
+ last_token, self._last_poked_id, limit=20,
+ )
+
+ logger.debug("Handling %s -> %s", last_token, next_token)
+
+ if not events and next_token >= self._last_poked_id:
+ break
+
+ for event in events:
+ users_in_room = yield self.state.get_current_user_in_room(
+ event.room_id, latest_event_ids=[event.event_id],
+ )
+
+ destinations = set(
+ get_domain_from_id(user_id) for user_id in users_in_room
+ )
+
+ if event.type == EventTypes.Member:
+ if event.content["membership"] == Membership.JOIN:
+ destinations.add(get_domain_from_id(event.state_key))
+
+ logger.debug("Sending %s to %r", event, destinations)
+
+ self._send_pdu(event, destinations)
+
+ yield self.store.update_federation_out_pos(
+ "events", next_token
+ )
+
+ finally:
+ self._is_processing = False
+
+ def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
+ order = self._order
+ self._order += 1
+
destinations = set(destinations)
destinations = set(
dest for dest in destinations if self.can_send_to(dest)
@@ -130,6 +194,8 @@ class TransactionQueue(object):
if not destinations:
return
+ sent_pdus_destination_dist.inc_by(len(destinations))
+
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
(pdu, order)
@@ -139,7 +205,10 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
- def enqueue_presence(self, destination, states):
+ def send_presence(self, destination, states):
+ if not self.can_send_to(destination):
+ return
+
self.pending_presence_by_dest.setdefault(destination, {}).update({
state.user_id: state for state in states
})
@@ -148,12 +217,19 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
- def enqueue_edu(self, edu, key=None):
- destination = edu.destination
+ def send_edu(self, destination, edu_type, content, key=None):
+ edu = Edu(
+ origin=self.server_name,
+ destination=destination,
+ edu_type=edu_type,
+ content=content,
+ )
if not self.can_send_to(destination):
return
+ sent_edus_counter.inc()
+
if key:
self.pending_edus_keyed_by_dest.setdefault(
destination, {}
@@ -165,7 +241,7 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
- def enqueue_failure(self, failure, destination):
+ def send_failure(self, failure, destination):
if destination == self.server_name or destination == "localhost":
return
@@ -180,7 +256,7 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
- def enqueue_device_messages(self, destination):
+ def send_device_messages(self, destination):
if destination == self.server_name or destination == "localhost":
return
@@ -191,6 +267,9 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
+ def get_current_token(self):
+ return 0
+
@defer.inlineCallbacks
def _attempt_new_transaction(self, destination):
# list of (pending_pdu, deferred, order)
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 63d05f2531..5ad408f549 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -24,7 +24,6 @@ from .profile import ProfileHandler
from .directory import DirectoryHandler
from .admin import AdminHandler
from .identity import IdentityHandler
-from .receipts import ReceiptsHandler
from .search import SearchHandler
@@ -56,7 +55,6 @@ class Handlers(object):
self.profile_handler = ProfileHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
- self.receipts_handler = ReceiptsHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index c5368e5df2..f7fad15c62 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -34,9 +34,9 @@ class DeviceMessageHandler(object):
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_federation_sender()
- self.federation.register_edu_handler(
+ hs.get_replication_layer().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2d801bad47..771ab3bc43 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -80,22 +80,6 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up
self.room_queues = {}
- def handle_new_event(self, event, destinations):
- """ Takes in an event from the client to server side, that has already
- been authed and handled by the state module, and sends it to any
- remote home servers that may be interested.
-
- Args:
- event: The event to send
- destinations: A list of destinations to send it to
-
- Returns:
- Deferred: Resolved when it has successfully been queued for
- processing.
- """
-
- return self.replication_layer.send_pdu(event, destinations)
-
@log_function
@defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
@@ -830,25 +814,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- new_pdu = event
-
- users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
- destinations = set(
- get_domain_from_id(user_id) for user_id in users_in_room
- if not self.hs.is_mine_id(user_id)
- )
-
- destinations.discard(origin)
-
- logger.debug(
- "on_send_join_request: Sending event: %s, signatures: %s",
- event.event_id,
- event.signatures,
- )
-
- self.replication_layer.send_pdu(new_pdu, destinations)
-
state_ids = context.prev_state_ids.values()
auth_chain = yield self.store.get_auth_chain(set(
[event.event_id] + state_ids
@@ -1055,24 +1020,6 @@ class FederationHandler(BaseHandler):
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- new_pdu = event
-
- users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
- destinations = set(
- get_domain_from_id(user_id) for user_id in users_in_room
- if not self.hs.is_mine_id(user_id)
- )
- destinations.discard(origin)
-
- logger.debug(
- "on_send_leave_request: Sending event: %s, signatures: %s",
- event.event_id,
- event.signatures,
- )
-
- self.replication_layer.send_pdu(new_pdu, destinations)
-
defer.returnValue(None)
@defer.inlineCallbacks
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fbfa5a0281..e0ade4c164 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -372,11 +372,12 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_receipts():
- receipts_handler = self.hs.get_handlers().receipts_handler
- receipts = yield receipts_handler.get_receipts_for_room(
+ receipts = yield self.store.get_linearized_receipts_for_room(
room_id,
- now_token.receipt_key
+ to_key=now_token.receipt_key,
)
+ if not receipts:
+ receipts = []
defer.returnValue(receipts)
presence, receipts, (messages, token) = yield defer.gatherResults(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 81df45177a..fd09397226 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.push.action_generator import ActionGenerator
from synapse.types import (
- UserID, RoomAlias, RoomStreamToken, get_domain_from_id
+ UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock
from synapse.util.logcontext import preserve_fn
@@ -599,13 +599,6 @@ class MessageHandler(BaseHandler):
event_stream_id, max_stream_id
)
- users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
- destinations = [
- get_domain_from_id(user_id) for user_id in users_in_room
- if not self.hs.is_mine_id(user_id)
- ]
-
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
@@ -618,7 +611,3 @@ class MessageHandler(BaseHandler):
# If invite, remove room_state from unsigned before sending.
event.unsigned.pop("invite_room_state", None)
-
- preserve_fn(federation_handler.handle_new_event)(
- event, destinations=destinations,
- )
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b047ae2250..1b89dc6274 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -91,28 +91,29 @@ class PresenceHandler(object):
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
- self.federation = hs.get_replication_layer()
+ self.replication = hs.get_replication_layer()
+ self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence", self.incoming_presence
)
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index e536a909d0..916e80a48e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -33,8 +33,8 @@ class ReceiptsHandler(BaseHandler):
self.server_name = hs.config.server_name
self.store = hs.get_datastore()
self.hs = hs
- self.federation = hs.get_replication_layer()
- self.federation.register_edu_handler(
+ self.federation = hs.get_federation_sender()
+ hs.get_replication_layer().register_edu_handler(
"m.receipt", self._received_remote_receipt
)
self.clock = self.hs.get_clock()
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 27ee715ff0..0eea7f8f9c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -55,9 +55,9 @@ class TypingHandler(object):
self.clock = hs.get_clock()
self.wheel_timer = WheelTimer(bucket_size=5000)
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_federation_sender()
- self.federation.register_edu_handler("m.typing", self._recv_edu)
+ hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
hs.get_distributor().observe("user_left_room", self.user_left_room)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 48653ae843..054ca59ad2 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -143,6 +143,12 @@ class Notifier(object):
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
+
+ if hs.should_send_federation():
+ self.federation_sender = hs.get_federation_sender()
+ else:
+ self.federation_sender = None
+
self.state_handler = hs.get_state_handler()
self.clock.looping_call(
@@ -220,6 +226,9 @@ class Notifier(object):
# poke any interested application service.
self.appservice_handler.notify_interested_services(room_stream_id)
+ if self.federation_sender:
+ self.federation_sender.notify_new_events(room_stream_id)
+
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id)
diff --git a/synapse/replication/expire_cache.py b/synapse/replication/expire_cache.py
new file mode 100644
index 0000000000..c05a50d7a6
--- /dev/null
+++ b/synapse/replication/expire_cache.py
@@ -0,0 +1,60 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import respond_with_json_bytes, request_handler
+from synapse.http.servlet import parse_json_object_from_request
+
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
+
+
+class ExpireCacheResource(Resource):
+ """
+ HTTP endpoint for expiring storage caches.
+
+ POST /_synapse/replication/expire_cache HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "invalidate": [
+ {
+ "name": "func_name",
+ "keys": ["key1", "key2"]
+ }
+ ]
+ }
+ """
+
+ def __init__(self, hs):
+ Resource.__init__(self) # Resource is old-style, so no super()
+
+ self.store = hs.get_datastore()
+ self.version_string = hs.version_string
+ self.clock = hs.get_clock()
+
+ def render_POST(self, request):
+ self._async_render_POST(request)
+ return NOT_DONE_YET
+
+ @request_handler()
+ def _async_render_POST(self, request):
+ content = parse_json_object_from_request(request)
+
+ for row in content["invalidate"]:
+ name = row["name"]
+ keys = tuple(row["keys"])
+
+ getattr(self.store, name).invalidate(keys)
+
+ respond_with_json_bytes(request, 200, "{}")
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 5a14c51d23..d79b421cba 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -17,6 +17,7 @@ from synapse.http.servlet import parse_integer, parse_string
from synapse.http.server import request_handler, finish_request
from synapse.replication.pusher_resource import PusherResource
from synapse.replication.presence_resource import PresenceResource
+from synapse.replication.expire_cache import ExpireCacheResource
from synapse.api.errors import SynapseError
from twisted.web.resource import Resource
@@ -44,6 +45,7 @@ STREAM_NAMES = (
("caches",),
("to_device",),
("public_rooms",),
+ ("federation",),
)
@@ -116,11 +118,14 @@ class ReplicationResource(Resource):
self.sources = hs.get_event_sources()
self.presence_handler = hs.get_presence_handler()
self.typing_handler = hs.get_typing_handler()
+ self.federation_sender = hs.get_federation_sender()
self.notifier = hs.notifier
self.clock = hs.get_clock()
+ self.config = hs.get_config()
self.putChild("remove_pushers", PusherResource(hs))
self.putChild("syncing_users", PresenceResource(hs))
+ self.putChild("expire_cache", ExpireCacheResource(hs))
def render_GET(self, request):
self._async_render_GET(request)
@@ -134,6 +139,7 @@ class ReplicationResource(Resource):
pushers_token = self.store.get_pushers_stream_token()
caches_token = self.store.get_cache_stream_token()
public_rooms_token = self.store.get_current_public_room_stream_id()
+ federation_token = self.federation_sender.get_current_token()
defer.returnValue(_ReplicationToken(
room_stream_token,
@@ -148,6 +154,7 @@ class ReplicationResource(Resource):
caches_token,
int(stream_token.to_device_key),
int(public_rooms_token),
+ int(federation_token),
))
@request_handler()
@@ -164,8 +171,13 @@ class ReplicationResource(Resource):
}
request_streams["streams"] = parse_string(request, "streams")
+ federation_ack = parse_integer(request, "federation_ack", None)
+
def replicate():
- return self.replicate(request_streams, limit)
+ return self.replicate(
+ request_streams, limit,
+ federation_ack=federation_ack
+ )
writer = yield self.notifier.wait_for_replication(replicate, timeout)
result = writer.finish()
@@ -183,7 +195,7 @@ class ReplicationResource(Resource):
finish_request(request)
@defer.inlineCallbacks
- def replicate(self, request_streams, limit):
+ def replicate(self, request_streams, limit, federation_ack=None):
writer = _Writer()
current_token = yield self.current_replication_token()
logger.debug("Replicating up to %r", current_token)
@@ -202,6 +214,7 @@ class ReplicationResource(Resource):
yield self.caches(writer, current_token, limit, request_streams)
yield self.to_device(writer, current_token, limit, request_streams)
yield self.public_rooms(writer, current_token, limit, request_streams)
+ self.federation(writer, current_token, limit, request_streams, federation_ack)
self.streams(writer, current_token, request_streams)
logger.debug("Replicated %d rows", writer.total)
@@ -465,6 +478,23 @@ class ReplicationResource(Resource):
"position", "room_id", "visibility"
), position=upto_token)
+ def federation(self, writer, current_token, limit, request_streams, federation_ack):
+ if self.config.send_federation:
+ return
+
+ current_position = current_token.federation
+
+ federation = request_streams.get("federation")
+
+ if federation is not None and federation != current_position:
+ federation_rows = self.federation_sender.get_replication_rows(
+ federation, limit, federation_ack=federation_ack,
+ )
+ upto_token = _position_from_rows(federation_rows, current_position)
+ writer.write_header_and_rows("federation", federation_rows, (
+ "position", "type", "content",
+ ), position=upto_token)
+
class _Writer(object):
"""Writes the streams as a JSON object as the response to the request"""
@@ -497,6 +527,7 @@ class _Writer(object):
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
"events", "presence", "typing", "receipts", "account_data", "backfill",
"push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
+ "federation",
))):
__slots__ = []
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index f19540d6bb..18076e0f3b 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -34,6 +34,9 @@ class BaseSlavedStore(SQLBaseStore):
else:
self._cache_id_gen = None
+ self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache"
+ self.http_client = hs.get_simple_http_client()
+
def stream_positions(self):
pos = {}
if self._cache_id_gen:
@@ -54,3 +57,19 @@ class BaseSlavedStore(SQLBaseStore):
logger.info("Got unexpected cache_func: %r", cache_func)
self._cache_id_gen.advance(int(stream["position"]))
return defer.succeed(None)
+
+ def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+ txn.call_after(cache_func.invalidate, keys)
+ txn.call_after(self._send_invalidation_poke, cache_func, keys)
+
+ @defer.inlineCallbacks
+ def _send_invalidation_poke(self, cache_func, keys):
+ try:
+ yield self.http_client.post_json_get_json(self.expire_cache_url, {
+ "invalidate": [{
+ "name": cache_func.__name__,
+ "keys": list(keys),
+ }]
+ })
+ except:
+ logger.exception("Failed to poke on expire_cache")
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 3bfd5e8213..cc860f9f9b 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -29,10 +29,16 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
"DeviceInboxStreamChangeCache",
self._device_inbox_id_gen.get_current_token()
)
+ self._device_federation_outbox_stream_cache = StreamChangeCache(
+ "DeviceFederationOutboxStreamChangeCache",
+ self._device_inbox_id_gen.get_current_token()
+ )
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
+ get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
delete_messages_for_device = DataStore.delete_messages_for_device.__func__
+ delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
def stream_positions(self):
result = super(SlavedDeviceInboxStore, self).stream_positions()
@@ -45,9 +51,15 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
self._device_inbox_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
stream_id = row[0]
- user_id = row[1]
- self._device_inbox_stream_cache.entity_has_changed(
- user_id, stream_id
- )
+ entity = row[1]
+
+ if entity.startswith("@"):
+ self._device_inbox_stream_cache.entity_has_changed(
+ entity, stream_id
+ )
+ else:
+ self._device_federation_outbox_stream_cache.entity_has_changed(
+ entity, stream_id
+ )
return super(SlavedDeviceInboxStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 0c26e96e98..64f18bbb3e 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -26,6 +26,11 @@ from synapse.storage.stream import StreamStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
import ujson as json
+import logging
+
+
+logger = logging.getLogger(__name__)
+
# So, um, we want to borrow a load of functions intended for reading from
# a DataStore, but we don't want to take functions that either write to the
@@ -180,6 +185,11 @@ class SlavedEventStore(BaseSlavedStore):
EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
)
+ get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
+
+ get_federation_out_pos = DataStore.get_federation_out_pos.__func__
+ update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()
@@ -194,6 +204,10 @@ class SlavedEventStore(BaseSlavedStore):
stream = result.get("events")
if stream:
self._stream_id_gen.advance(int(stream["position"]))
+
+ if stream["rows"]:
+ logger.info("Got %d event rows", len(stream["rows"]))
+
for row in stream["rows"]:
self._process_replication_row(
row, backfilled=False, state_resets=state_resets
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index 6f2ba98af5..fbb58f35da 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
from ._base import BaseSlavedStore
from synapse.storage import DataStore
from synapse.storage.transactions import TransactionStore
@@ -22,9 +21,10 @@ from synapse.storage.transactions import TransactionStore
class TransactionStore(BaseSlavedStore):
get_destination_retry_timings = TransactionStore.__dict__[
"get_destination_retry_timings"
- ].orig
+ ]
_get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
+ set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
+ _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
- # For now, don't record the destination rety timings
- def set_destination_retry_timings(*args, **kwargs):
- return defer.succeed(None)
+ prep_send_transaction = DataStore.prep_send_transaction.__func__
+ delivered_txn = DataStore.delivered_txn.__func__
diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py
index 891cef99c6..1fbff2edd8 100644
--- a/synapse/rest/client/v2_alpha/receipts.py
+++ b/synapse/rest/client/v2_alpha/receipts.py
@@ -36,7 +36,7 @@ class ReceiptRestServlet(RestServlet):
super(ReceiptRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
- self.receipts_handler = hs.get_handlers().receipts_handler
+ self.receipts_handler = hs.get_receipts_handler()
self.presence_handler = hs.get_presence_handler()
@defer.inlineCallbacks
diff --git a/synapse/server.py b/synapse/server.py
index 374124a147..0bfb411269 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -32,6 +32,9 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
from synapse.federation import initialize_http_replication
+from synapse.federation.send_queue import FederationRemoteSendQueue
+from synapse.federation.transport.client import TransportLayerClient
+from synapse.federation.transaction_queue import TransactionQueue
from synapse.handlers import Handlers
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.handlers.auth import AuthHandler
@@ -44,6 +47,7 @@ from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.initial_sync import InitialSyncHandler
+from synapse.handlers.receipts import ReceiptsHandler
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.notifier import Notifier
@@ -124,6 +128,9 @@ class HomeServer(object):
'http_client_context_factory',
'simple_http_client',
'media_repository',
+ 'federation_transport_client',
+ 'federation_sender',
+ 'receipts_handler',
]
def __init__(self, hostname, **kwargs):
@@ -265,9 +272,30 @@ class HomeServer(object):
def build_media_repository(self):
return MediaRepository(self)
+ def build_federation_transport_client(self):
+ return TransportLayerClient(self)
+
+ def build_federation_sender(self):
+ if self.should_send_federation():
+ return TransactionQueue(self)
+ elif not self.config.worker_app:
+ return FederationRemoteSendQueue(self)
+ else:
+ raise Exception("Workers cannot send federation traffic")
+
+ def build_receipts_handler(self):
+ return ReceiptsHandler(self)
+
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
+ def should_send_federation(self):
+ "Should this server be sending federation traffic directly?"
+ return self.config.send_federation and (
+ not self.config.worker_app
+ or self.config.worker_app == "synapse.app.federation_sender"
+ )
+
def _make_dependency_method(depname):
def _get(hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d828d6ee1d..b62c459d8b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -561,12 +561,17 @@ class SQLBaseStore(object):
@staticmethod
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
+ if keyvalues:
+ where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+ else:
+ where = ""
+
sql = (
- "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
+ "SELECT %(retcol)s FROM %(table)s %(where)s"
) % {
"retcol": retcol,
"table": table,
- "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
+ "where": where,
}
txn.execute(sql, keyvalues.values())
@@ -744,10 +749,15 @@ class SQLBaseStore(object):
@staticmethod
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
- update_sql = "UPDATE %s SET %s WHERE %s" % (
+ if keyvalues:
+ where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+ else:
+ where = ""
+
+ update_sql = "UPDATE %s SET %s %s" % (
table,
", ".join("%s = ?" % (k,) for k in updatevalues),
- " AND ".join("%s = ?" % (k,) for k in keyvalues)
+ where,
)
txn.execute(
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index f640e73714..87398d60bc 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore):
return defer.succeed([])
def get_all_new_device_messages_txn(txn):
+ # We limit like this as we might have multiple rows per stream_id, and
+ # we want to make sure we always get all entries for any stream_id
+ # we return.
+ upper_pos = min(current_pos, last_pos + limit)
sql = (
- "SELECT stream_id FROM device_inbox"
+ "SELECT stream_id, user_id"
+ " FROM device_inbox"
" WHERE ? < stream_id AND stream_id <= ?"
- " GROUP BY stream_id"
" ORDER BY stream_id ASC"
- " LIMIT ?"
)
- txn.execute(sql, (last_pos, current_pos, limit))
- stream_ids = txn.fetchall()
- if not stream_ids:
- return []
- max_stream_id_in_limit = stream_ids[-1]
+ txn.execute(sql, (last_pos, upper_pos))
+ rows = txn.fetchall()
sql = (
- "SELECT stream_id, user_id, device_id, message_json"
- " FROM device_inbox"
+ "SELECT stream_id, destination"
+ " FROM device_federation_outbox"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
)
- txn.execute(sql, (last_pos, max_stream_id_in_limit))
- return txn.fetchall()
+ txn.execute(sql, (last_pos, upper_pos))
+ rows.extend(txn.fetchall())
+
+ return rows
return self.runInteraction(
"get_all_new_device_messages", get_all_new_device_messages_txn
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6576a30098..e46ae6502e 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 38
+SCHEMA_VERSION = 39
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 21d0696640..7460f98a1f 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -37,6 +37,13 @@ class UserPresenceState(namedtuple("UserPresenceState",
status_msg (str): User set status message.
"""
+ def as_dict(self):
+ return dict(self._asdict())
+
+ @staticmethod
+ def from_dict(d):
+ return UserPresenceState(**d)
+
def copy_and_replace(self, **kwargs):
return self._replace(**kwargs)
diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
new file mode 100644
index 0000000000..00be801e90
--- /dev/null
+++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id);
diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql
new file mode 100644
index 0000000000..edbd8e132f
--- /dev/null
+++ b/synapse/storage/schema/delta/39/federation_out_position.sql
@@ -0,0 +1,22 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE federation_stream_position(
+ type TEXT NOT NULL,
+ stream_id INTEGER NOT NULL
+ );
+
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 888b1cb35d..7fa63b58a7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -765,3 +765,50 @@ class StreamStore(SQLBaseStore):
"token": end_token,
},
}
+
+ @defer.inlineCallbacks
+ def get_all_new_events_stream(self, from_id, current_id, limit):
+ """Get all new events"""
+
+ def get_all_new_events_stream_txn(txn):
+ sql = (
+ "SELECT e.stream_ordering, e.event_id"
+ " FROM events AS e"
+ " WHERE"
+ " ? < e.stream_ordering AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (from_id, current_id, limit))
+ rows = txn.fetchall()
+
+ upper_bound = current_id
+ if len(rows) == limit:
+ upper_bound = rows[-1][0]
+
+ return upper_bound, [row[1] for row in rows]
+
+ upper_bound, event_ids = yield self.runInteraction(
+ "get_all_new_events_stream", get_all_new_events_stream_txn,
+ )
+
+ events = yield self._get_events(event_ids)
+
+ defer.returnValue((upper_bound, events))
+
+ def get_federation_out_pos(self, typ):
+ return self._simple_select_one_onecol(
+ table="federation_stream_position",
+ retcol="stream_id",
+ keyvalues={"type": typ},
+ desc="get_federation_out_pos"
+ )
+
+ def update_federation_out_pos(self, typ, stream_id):
+ return self._simple_update_one(
+ table="federation_stream_position",
+ keyvalues={"type": typ},
+ updatevalues={"stream_id": stream_id},
+ desc="update_federation_out_pos",
+ )
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index adab520c78..809fdd311f 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -200,25 +200,48 @@ class TransactionStore(SQLBaseStore):
def _set_destination_retry_timings(self, txn, destination,
retry_last_ts, retry_interval):
- txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
+ self.database_engine.lock_table(txn, "destinations")
- self._simple_upsert_txn(
+ self._invalidate_cache_and_stream(
+ txn, self.get_destination_retry_timings, (destination,)
+ )
+
+ # We need to be careful here as the data may have changed from under us
+ # due to a worker setting the timings.
+
+ prev_row = self._simple_select_one_txn(
txn,
- "destinations",
+ table="destinations",
keyvalues={
"destination": destination,
},
- values={
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval,
- },
- insertion_values={
- "destination": destination,
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval,
- }
+ retcols=("retry_last_ts", "retry_interval"),
+ allow_none=True,
)
+ if not prev_row:
+ self._simple_insert_txn(
+ txn,
+ table="destinations",
+ values={
+ "destination": destination,
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval,
+ }
+ )
+ elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
+ self._simple_update_one_txn(
+ txn,
+ "destinations",
+ keyvalues={
+ "destination": destination,
+ },
+ updatevalues={
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval,
+ },
+ )
+
def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction.
diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py
index 3fd5c3d9fd..d668e5a6b8 100644
--- a/synapse/util/jsonobject.py
+++ b/synapse/util/jsonobject.py
@@ -76,15 +76,26 @@ class JsonEncodedObject(object):
d.update(self.unrecognized_keys)
return d
+ def get_internal_dict(self):
+ d = {
+ k: _encode(v, internal=True) for (k, v) in self.__dict__.items()
+ if k in self.valid_keys
+ }
+ d.update(self.unrecognized_keys)
+ return d
+
def __str__(self):
return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__))
-def _encode(obj):
+def _encode(obj, internal=False):
if type(obj) is list:
- return [_encode(o) for o in obj]
+ return [_encode(o, internal=internal) for o in obj]
if isinstance(obj, JsonEncodedObject):
- return obj.get_dict()
+ if internal:
+ return obj.get_internal_dict()
+ else:
+ return obj.get_dict()
return obj
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 49527f4d21..46ef5a8ec7 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -121,12 +121,6 @@ class RetryDestinationLimiter(object):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
- def err(failure):
- logger.exception(
- "Failed to store set_destination_retry_timings",
- failure.value
- )
-
valid_err_code = False
if exc_type is not None and issubclass(exc_type, CodeMessageException):
valid_err_code = 0 <= exc_val.code < 500
@@ -151,6 +145,15 @@ class RetryDestinationLimiter(object):
retry_last_ts = int(self.clock.time_msec())
- self.store.set_destination_retry_timings(
- self.destination, retry_last_ts, self.retry_interval
- ).addErrback(err)
+ @defer.inlineCallbacks
+ def store_retry_timings():
+ try:
+ yield self.store.set_destination_retry_timings(
+ self.destination, retry_last_ts, self.retry_interval
+ )
+ except:
+ logger.exception(
+ "Failed to store set_destination_retry_timings",
+ )
+
+ store_retry_timings()
|