diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py
new file mode 100644
index 0000000000..451dae3b6c
--- /dev/null
+++ b/synapse/replication/tcp/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
new file mode 100644
index 0000000000..07adf9412e
--- /dev/null
+++ b/synapse/replication/tcp/streams.py
@@ -0,0 +1,409 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Defines all the valid streams that clients can subscribe to, and the format
+of the rows returned by each stream.
+
+Each stream is defined by the following information:
+
+ stream name: The name of the stream
+ row type: The type that is used to serialise/deserialse the row
+ current_token: The function that returns the current token for the stream
+ update_function: The function that returns a list of updates between two tokens
+"""
+
+from twisted.internet import defer
+from collections import namedtuple
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+MAX_EVENTS_BEHIND = 10000
+
+
+EventStreamRow = namedtuple("EventStreamRow",
+ ("event_id", "room_id", "type", "state_key", "redacts"))
+BackfillStreamRow = namedtuple("BackfillStreamRow",
+ ("event_id", "room_id", "type", "state_key", "redacts"))
+PresenceStreamRow = namedtuple("PresenceStreamRow",
+ ("user_id", "state", "last_active_ts",
+ "last_federation_update_ts", "last_user_sync_ts",
+ "status_msg", "currently_active"))
+TypingStreamRow = namedtuple("TypingStreamRow",
+ ("room_id", "user_ids"))
+ReceiptsStreamRow = namedtuple("ReceiptsStreamRow",
+ ("room_id", "receipt_type", "user_id", "event_id",
+ "data"))
+PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))
+PushersStreamRow = namedtuple("PushersStreamRow",
+ ("user_id", "app_id", "pushkey", "deleted",))
+CachesStreamRow = namedtuple("CachesStreamRow",
+ ("cache_func", "keys", "invalidation_ts",))
+PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow",
+ ("room_id", "visibility", "appservice_id",
+ "network_id",))
+DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",))
+ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))
+FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",))
+TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow",
+ ("user_id", "room_id", "data"))
+AccountDataStreamRow = namedtuple("AccountDataStream",
+ ("user_id", "room_id", "data_type", "data"))
+
+
+class Stream(object):
+ """Base class for the streams.
+
+ Provides a `get_updates()` function that returns new updates since the last
+ time it was called up until the point `advance_current_token` was called.
+ """
+ NAME = None # The name of the stream
+ ROW_TYPE = None # The type of the row
+ _LIMITED = True # Whether the update function takes a limit
+
+ def __init__(self, hs):
+ # The token from which we last asked for updates
+ self.last_token = self.current_token()
+
+ # The token that we will get updates up to
+ self.upto_token = self.current_token()
+
+ def advance_current_token(self):
+ """Updates `upto_token` to "now", which updates up until which point
+ get_updates[_since] will fetch rows till.
+ """
+ self.upto_token = self.current_token()
+
+ @defer.inlineCallbacks
+ def get_updates(self):
+ """Gets all updates since the last time this function was called (or
+ since the stream was constructed if it hadn't been called before),
+ until the `upto_token`
+
+ Returns:
+ (list(ROW_TYPE), int): list of updates plus the token used as an
+ upper bound of the updates (i.e. the "current token")
+ """
+ updates, current_token = yield self.get_updates_since(self.last_token)
+ self.last_token = current_token
+
+ defer.returnValue((updates, current_token))
+
+ @defer.inlineCallbacks
+ def get_updates_since(self, from_token):
+ """Like get_updates except allows specifying from when we should
+ stream updates
+
+ Returns:
+ (list(ROW_TYPE), int): list of updates plus the token used as an
+ upper bound of the updates (i.e. the "current token")
+ """
+ if from_token in ("NOW", "now"):
+ defer.returnValue(([], self.upto_token))
+
+ current_token = self.upto_token
+
+ from_token = int(from_token)
+
+ if from_token == current_token:
+ defer.returnValue(([], current_token))
+
+ if self._LIMITED:
+ rows = yield self.update_function(
+ from_token, current_token,
+ limit=MAX_EVENTS_BEHIND + 1,
+ )
+
+ if len(rows) >= MAX_EVENTS_BEHIND:
+ raise Exception("stream %s has fallen behined" % (self.NAME))
+ else:
+ rows = yield self.update_function(
+ from_token, current_token,
+ )
+
+ updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
+
+ defer.returnValue((updates, current_token))
+
+ def current_token(self):
+ """Gets the current token of the underlying streams. Should be provided
+ by the sub classes
+
+ Returns:
+ int
+ """
+ raise NotImplementedError()
+
+ def update_function(self, from_token, current_token, limit=None):
+ """Get updates between from_token and to_token. If Stream._LIMITED is
+ True then limit is provided, otherwise it's not.
+
+ Returns:
+ list(tuple): the first entry in the tuple is the token for that
+ update, and the rest of the tuple gets used to construct
+ a ``ROW_TYPE`` instance
+ """
+ raise NotImplementedError()
+
+
+class EventsStream(Stream):
+ """We received a new event, or an event went from being an outlier to not
+ """
+ NAME = "events"
+ ROW_TYPE = EventStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_current_events_token
+ self.update_function = store.get_all_new_forward_event_rows
+
+ super(EventsStream, self).__init__(hs)
+
+
+class BackfillStream(Stream):
+ """We fetched some old events and either we had never seen that event before
+ or it went from being an outlier to not.
+ """
+ NAME = "backfill"
+ ROW_TYPE = BackfillStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_current_backfill_token
+ self.update_function = store.get_all_new_backfill_event_rows
+
+ super(BackfillStream, self).__init__(hs)
+
+
+class PresenceStream(Stream):
+ NAME = "presence"
+ _LIMITED = False
+ ROW_TYPE = PresenceStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ presence_handler = hs.get_presence_handler()
+
+ self.current_token = store.get_current_presence_token
+ self.update_function = presence_handler.get_all_presence_updates
+
+ super(PresenceStream, self).__init__(hs)
+
+
+class TypingStream(Stream):
+ NAME = "typing"
+ _LIMITED = False
+ ROW_TYPE = TypingStreamRow
+
+ def __init__(self, hs):
+ typing_handler = hs.get_typing_handler()
+
+ self.current_token = typing_handler.get_current_token
+ self.update_function = typing_handler.get_all_typing_updates
+
+ super(TypingStream, self).__init__(hs)
+
+
+class ReceiptsStream(Stream):
+ NAME = "receipts"
+ ROW_TYPE = ReceiptsStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+
+ self.current_token = store.get_max_receipt_stream_id
+ self.update_function = store.get_all_updated_receipts
+
+ super(ReceiptsStream, self).__init__(hs)
+
+
+class PushRulesStream(Stream):
+ """A user has changed their push rules
+ """
+ NAME = "push_rules"
+ ROW_TYPE = PushRulesStreamRow
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ super(PushRulesStream, self).__init__(hs)
+
+ def current_token(self):
+ push_rules_token, _ = self.store.get_push_rules_stream_token()
+ return push_rules_token
+
+ @defer.inlineCallbacks
+ def update_function(self, from_token, to_token, limit):
+ rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
+ defer.returnValue([(row[0], row[2]) for row in rows])
+
+
+class PushersStream(Stream):
+ """A user has added/changed/removed a pusher
+ """
+ NAME = "pushers"
+ ROW_TYPE = PushersStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+
+ self.current_token = store.get_pushers_stream_token
+ self.update_function = store.get_all_updated_pushers_rows
+
+ super(PushersStream, self).__init__(hs)
+
+
+class CachesStream(Stream):
+ """A cache was invalidated on the master and no other stream would invalidate
+ the cache on the workers
+ """
+ NAME = "caches"
+ ROW_TYPE = CachesStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+
+ self.current_token = store.get_cache_stream_token
+ self.update_function = store.get_all_updated_caches
+
+ super(CachesStream, self).__init__(hs)
+
+
+class PublicRoomsStream(Stream):
+ """The public rooms list changed
+ """
+ NAME = "public_rooms"
+ ROW_TYPE = PublicRoomsStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+
+ self.current_token = store.get_current_public_room_stream_id
+ self.update_function = store.get_all_new_public_rooms
+
+ super(PublicRoomsStream, self).__init__(hs)
+
+
+class DeviceListsStream(Stream):
+ """Someone added/changed/removed a device
+ """
+ NAME = "device_lists"
+ _LIMITED = False
+ ROW_TYPE = DeviceListsStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+
+ self.current_token = store.get_device_stream_token
+ self.update_function = store.get_all_device_list_changes_for_remotes
+
+ super(DeviceListsStream, self).__init__(hs)
+
+
+class ToDeviceStream(Stream):
+ """New to_device messages for a client
+ """
+ NAME = "to_device"
+ ROW_TYPE = ToDeviceStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+
+ self.current_token = store.get_to_device_stream_token
+ self.update_function = store.get_all_new_device_messages
+
+ super(ToDeviceStream, self).__init__(hs)
+
+
+class FederationStream(Stream):
+ """Data to be sent over federation. Only available when master has federation
+ sending disabled.
+ """
+ NAME = "federation"
+ ROW_TYPE = FederationStreamRow
+
+ def __init__(self, hs):
+ federation_sender = hs.get_federation_sender()
+
+ self.current_token = federation_sender.get_current_token
+ self.update_function = federation_sender.get_replication_rows
+
+ super(FederationStream, self).__init__(hs)
+
+
+class TagAccountDataStream(Stream):
+ """Someone added/removed a tag for a room
+ """
+ NAME = "tag_account_data"
+ ROW_TYPE = TagAccountDataStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+
+ self.current_token = store.get_max_account_data_stream_id
+ self.update_function = store.get_all_updated_tags
+
+ super(TagAccountDataStream, self).__init__(hs)
+
+
+class AccountDataStream(Stream):
+ """Global or per room account data was changed
+ """
+ NAME = "account_data"
+ ROW_TYPE = AccountDataStreamRow
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+
+ self.current_token = self.store.get_max_account_data_stream_id
+
+ super(AccountDataStream, self).__init__(hs)
+
+ @defer.inlineCallbacks
+ def update_function(self, from_token, to_token, limit):
+ global_results, room_results = yield self.store.get_all_updated_account_data(
+ from_token, from_token, to_token, limit
+ )
+
+ results = list(room_results)
+ results.extend(
+ (stream_id, user_id, None, account_data_type, content,)
+ for stream_id, user_id, account_data_type, content in global_results
+ )
+
+ defer.returnValue(results)
+
+
+STREAMS_MAP = {
+ stream.NAME: stream
+ for stream in (
+ EventsStream,
+ BackfillStream,
+ PresenceStream,
+ TypingStream,
+ ReceiptsStream,
+ PushRulesStream,
+ PushersStream,
+ CachesStream,
+ PublicRoomsStream,
+ DeviceListsStream,
+ ToDeviceStream,
+ FederationStream,
+ TagAccountDataStream,
+ AccountDataStream,
+ )
+}
|