summary refs log tree commit diff
path: root/synapse/replication/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/resource.py')
-rw-r--r--synapse/replication/resource.py161
1 files changed, 109 insertions, 52 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 8c1ae0fbc7..8c2d487ff4 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -15,6 +15,8 @@
 
 from synapse.http.servlet import parse_integer, parse_string
 from synapse.http.server import request_handler, finish_request
+from synapse.replication.pusher_resource import PusherResource
+from synapse.replication.presence_resource import PresenceResource
 
 from twisted.web.resource import Resource
 from twisted.web.server import NOT_DONE_YET
@@ -38,6 +40,7 @@ STREAM_NAMES = (
     ("backfill",),
     ("push_rules",),
     ("pushers",),
+    ("state",),
 )
 
 
@@ -76,7 +79,7 @@ class ReplicationResource(Resource):
     The response is a JSON object with keys for each stream with updates. Under
     each key is a JSON object with:
 
-    * "postion": The current position of the stream.
+    * "position": The current position of the stream.
     * "field_names": The names of the fields in each row.
     * "rows": The updates as an array of arrays.
 
@@ -101,17 +104,19 @@ class ReplicationResource(Resource):
     long-polling this replication API for new data on those streams.
     """
 
-    isLeaf = True
-
     def __init__(self, hs):
         Resource.__init__(self)  # Resource is old-style, so no super()
 
         self.version_string = hs.version_string
         self.store = hs.get_datastore()
         self.sources = hs.get_event_sources()
-        self.presence_handler = hs.get_handlers().presence_handler
-        self.typing_handler = hs.get_handlers().typing_notification_handler
+        self.presence_handler = hs.get_presence_handler()
+        self.typing_handler = hs.get_typing_handler()
         self.notifier = hs.notifier
+        self.clock = hs.get_clock()
+
+        self.putChild("remove_pushers", PusherResource(hs))
+        self.putChild("syncing_users", PresenceResource(hs))
 
     def render_GET(self, request):
         self._async_render_GET(request)
@@ -123,6 +128,7 @@ class ReplicationResource(Resource):
         backfill_token = yield self.store.get_current_backfill_token()
         push_rules_token, room_stream_token = self.store.get_push_rules_stream_token()
         pushers_token = self.store.get_pushers_stream_token()
+        state_token = self.store.get_state_stream_token()
 
         defer.returnValue(_ReplicationToken(
             room_stream_token,
@@ -133,40 +139,62 @@ class ReplicationResource(Resource):
             backfill_token,
             push_rules_token,
             pushers_token,
+            state_token,
         ))
 
-    @request_handler
+    @request_handler()
     @defer.inlineCallbacks
     def _async_render_GET(self, request):
         limit = parse_integer(request, "limit", 100)
         timeout = parse_integer(request, "timeout", 10 * 1000)
 
         request.setHeader(b"Content-Type", b"application/json")
-        writer = _Writer(request)
 
-        @defer.inlineCallbacks
-        def replicate():
-            current_token = yield self.current_replication_token()
-            logger.info("Replicating up to %r", current_token)
+        request_streams = {
+            name: parse_integer(request, name)
+            for names in STREAM_NAMES for name in names
+        }
+        request_streams["streams"] = parse_string(request, "streams")
 
-            yield self.account_data(writer, current_token, limit)
-            yield self.events(writer, current_token, limit)
-            yield self.presence(writer, current_token)  # TODO: implement limit
-            yield self.typing(writer, current_token)  # TODO: implement limit
-            yield self.receipts(writer, current_token, limit)
-            yield self.push_rules(writer, current_token, limit)
-            yield self.pushers(writer, current_token, limit)
-            self.streams(writer, current_token)
+        def replicate():
+            return self.replicate(request_streams, limit)
 
-            logger.info("Replicated %d rows", writer.total)
-            defer.returnValue(writer.total)
+        result = yield self.notifier.wait_for_replication(replicate, timeout)
 
-        yield self.notifier.wait_for_replication(replicate, timeout)
+        for stream_name, stream_content in result.items():
+            logger.info(
+                "Replicating %d rows of %s from %s -> %s",
+                len(stream_content["rows"]),
+                stream_name,
+                request_streams.get(stream_name),
+                stream_content["position"],
+            )
 
-        writer.finish()
+        request.write(json.dumps(result, ensure_ascii=False))
+        finish_request(request)
 
-    def streams(self, writer, current_token):
-        request_token = parse_string(writer.request, "streams")
+    @defer.inlineCallbacks
+    def replicate(self, request_streams, limit):
+        writer = _Writer()
+        current_token = yield self.current_replication_token()
+        logger.info("Replicating up to %r", current_token)
+
+        yield self.account_data(writer, current_token, limit, request_streams)
+        yield self.events(writer, current_token, limit, request_streams)
+        # TODO: implement limit
+        yield self.presence(writer, current_token, request_streams)
+        yield self.typing(writer, current_token, request_streams)
+        yield self.receipts(writer, current_token, limit, request_streams)
+        yield self.push_rules(writer, current_token, limit, request_streams)
+        yield self.pushers(writer, current_token, limit, request_streams)
+        yield self.state(writer, current_token, limit, request_streams)
+        self.streams(writer, current_token, request_streams)
+
+        logger.info("Replicated %d rows", writer.total)
+        defer.returnValue(writer.finish())
+
+    def streams(self, writer, current_token, request_streams):
+        request_token = request_streams.get("streams")
 
         streams = []
 
@@ -191,32 +219,43 @@ class ReplicationResource(Resource):
                 )
 
     @defer.inlineCallbacks
-    def events(self, writer, current_token, limit):
-        request_events = parse_integer(writer.request, "events")
-        request_backfill = parse_integer(writer.request, "backfill")
+    def events(self, writer, current_token, limit, request_streams):
+        request_events = request_streams.get("events")
+        request_backfill = request_streams.get("backfill")
 
         if request_events is not None or request_backfill is not None:
             if request_events is None:
                 request_events = current_token.events
             if request_backfill is None:
                 request_backfill = current_token.backfill
-            events_rows, backfill_rows = yield self.store.get_all_new_events(
+            res = yield self.store.get_all_new_events(
                 request_backfill, request_events,
                 current_token.backfill, current_token.events,
                 limit
             )
+            writer.write_header_and_rows("events", res.new_forward_events, (
+                "position", "internal", "json", "state_group"
+            ))
+            writer.write_header_and_rows("backfill", res.new_backfill_events, (
+                "position", "internal", "json", "state_group"
+            ))
+            writer.write_header_and_rows(
+                "forward_ex_outliers", res.forward_ex_outliers,
+                ("position", "event_id", "state_group")
+            )
             writer.write_header_and_rows(
-                "events", events_rows, ("position", "internal", "json")
+                "backward_ex_outliers", res.backward_ex_outliers,
+                ("position", "event_id", "state_group")
             )
             writer.write_header_and_rows(
-                "backfill", backfill_rows, ("position", "internal", "json")
+                "state_resets", res.state_resets, ("position",)
             )
 
     @defer.inlineCallbacks
-    def presence(self, writer, current_token):
+    def presence(self, writer, current_token, request_streams):
         current_position = current_token.presence
 
-        request_presence = parse_integer(writer.request, "presence")
+        request_presence = request_streams.get("presence")
 
         if request_presence is not None:
             presence_rows = yield self.presence_handler.get_all_presence_updates(
@@ -229,10 +268,10 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def typing(self, writer, current_token):
+    def typing(self, writer, current_token, request_streams):
         current_position = current_token.presence
 
-        request_typing = parse_integer(writer.request, "typing")
+        request_typing = request_streams.get("typing")
 
         if request_typing is not None:
             typing_rows = yield self.typing_handler.get_all_typing_updates(
@@ -243,10 +282,10 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def receipts(self, writer, current_token, limit):
+    def receipts(self, writer, current_token, limit, request_streams):
         current_position = current_token.receipts
 
-        request_receipts = parse_integer(writer.request, "receipts")
+        request_receipts = request_streams.get("receipts")
 
         if request_receipts is not None:
             receipts_rows = yield self.store.get_all_updated_receipts(
@@ -257,12 +296,12 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def account_data(self, writer, current_token, limit):
+    def account_data(self, writer, current_token, limit, request_streams):
         current_position = current_token.account_data
 
-        user_account_data = parse_integer(writer.request, "user_account_data")
-        room_account_data = parse_integer(writer.request, "room_account_data")
-        tag_account_data = parse_integer(writer.request, "tag_account_data")
+        user_account_data = request_streams.get("user_account_data")
+        room_account_data = request_streams.get("room_account_data")
+        tag_account_data = request_streams.get("tag_account_data")
 
         if user_account_data is not None or room_account_data is not None:
             if user_account_data is None:
@@ -288,10 +327,10 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def push_rules(self, writer, current_token, limit):
+    def push_rules(self, writer, current_token, limit, request_streams):
         current_position = current_token.push_rules
 
-        push_rules = parse_integer(writer.request, "push_rules")
+        push_rules = request_streams.get("push_rules")
 
         if push_rules is not None:
             rows = yield self.store.get_all_push_rule_updates(
@@ -303,10 +342,11 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def pushers(self, writer, current_token, limit):
+    def pushers(self, writer, current_token, limit, request_streams):
         current_position = current_token.pushers
 
-        pushers = parse_integer(writer.request, "pushers")
+        pushers = request_streams.get("pushers")
+
         if pushers is not None:
             updated, deleted = yield self.store.get_all_updated_pushers(
                 pushers, current_position, limit
@@ -316,16 +356,34 @@ class ReplicationResource(Resource):
                 "app_id", "app_display_name", "device_display_name", "pushkey",
                 "ts", "lang", "data"
             ))
-            writer.write_header_and_rows("deleted", deleted, (
+            writer.write_header_and_rows("deleted_pushers", deleted, (
                 "position", "user_id", "app_id", "pushkey"
             ))
 
+    @defer.inlineCallbacks
+    def state(self, writer, current_token, limit, request_streams):
+        current_position = current_token.state
+
+        state = request_streams.get("state")
+
+        if state is not None:
+            state_groups, state_group_state = (
+                yield self.store.get_all_new_state_groups(
+                    state, current_position, limit
+                )
+            )
+            writer.write_header_and_rows("state_groups", state_groups, (
+                "position", "room_id", "event_id"
+            ))
+            writer.write_header_and_rows("state_group_state", state_group_state, (
+                "position", "type", "state_key", "event_id"
+            ))
+
 
 class _Writer(object):
     """Writes the streams as a JSON object as the response to the request"""
-    def __init__(self, request):
+    def __init__(self):
         self.streams = {}
-        self.request = request
         self.total = 0
 
     def write_header_and_rows(self, name, rows, fields, position=None):
@@ -336,7 +394,7 @@ class _Writer(object):
             position = rows[-1][0]
 
         self.streams[name] = {
-            "position": str(position),
+            "position": position if type(position) is int else str(position),
             "field_names": fields,
             "rows": rows,
         }
@@ -344,13 +402,12 @@ class _Writer(object):
         self.total += len(rows)
 
     def finish(self):
-        self.request.write(json.dumps(self.streams, ensure_ascii=False))
-        finish_request(self.request)
+        return self.streams
 
 
 class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
     "events", "presence", "typing", "receipts", "account_data", "backfill",
-    "push_rules", "pushers"
+    "push_rules", "pushers", "state"
 ))):
     __slots__ = []