diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/replication/resource.py | 161 |
1 files changed, 109 insertions, 52 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 8c1ae0fbc7..8c2d487ff4 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -15,6 +15,8 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request +from synapse.replication.pusher_resource import PusherResource +from synapse.replication.presence_resource import PresenceResource from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -38,6 +40,7 @@ STREAM_NAMES = ( ("backfill",), ("push_rules",), ("pushers",), + ("state",), ) @@ -76,7 +79,7 @@ class ReplicationResource(Resource): The response is a JSON object with keys for each stream with updates. Under each key is a JSON object with: - * "postion": The current position of the stream. + * "position": The current position of the stream. * "field_names": The names of the fields in each row. * "rows": The updates as an array of arrays. @@ -101,17 +104,19 @@ class ReplicationResource(Resource): long-polling this replication API for new data on those streams. """ - isLeaf = True - def __init__(self, hs): Resource.__init__(self) # Resource is old-style, so no super() self.version_string = hs.version_string self.store = hs.get_datastore() self.sources = hs.get_event_sources() - self.presence_handler = hs.get_handlers().presence_handler - self.typing_handler = hs.get_handlers().typing_notification_handler + self.presence_handler = hs.get_presence_handler() + self.typing_handler = hs.get_typing_handler() self.notifier = hs.notifier + self.clock = hs.get_clock() + + self.putChild("remove_pushers", PusherResource(hs)) + self.putChild("syncing_users", PresenceResource(hs)) def render_GET(self, request): self._async_render_GET(request) @@ -123,6 +128,7 @@ class ReplicationResource(Resource): backfill_token = yield self.store.get_current_backfill_token() push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() pushers_token = self.store.get_pushers_stream_token() + state_token = self.store.get_state_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -133,40 +139,62 @@ class ReplicationResource(Resource): backfill_token, push_rules_token, pushers_token, + state_token, )) - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): limit = parse_integer(request, "limit", 100) timeout = parse_integer(request, "timeout", 10 * 1000) request.setHeader(b"Content-Type", b"application/json") - writer = _Writer(request) - @defer.inlineCallbacks - def replicate(): - current_token = yield self.current_replication_token() - logger.info("Replicating up to %r", current_token) + request_streams = { + name: parse_integer(request, name) + for names in STREAM_NAMES for name in names + } + request_streams["streams"] = parse_string(request, "streams") - yield self.account_data(writer, current_token, limit) - yield self.events(writer, current_token, limit) - yield self.presence(writer, current_token) # TODO: implement limit - yield self.typing(writer, current_token) # TODO: implement limit - yield self.receipts(writer, current_token, limit) - yield self.push_rules(writer, current_token, limit) - yield self.pushers(writer, current_token, limit) - self.streams(writer, current_token) + def replicate(): + return self.replicate(request_streams, limit) - logger.info("Replicated %d rows", writer.total) - defer.returnValue(writer.total) + result = yield self.notifier.wait_for_replication(replicate, timeout) - yield self.notifier.wait_for_replication(replicate, timeout) + for stream_name, stream_content in result.items(): + logger.info( + "Replicating %d rows of %s from %s -> %s", + len(stream_content["rows"]), + stream_name, + request_streams.get(stream_name), + stream_content["position"], + ) - writer.finish() + request.write(json.dumps(result, ensure_ascii=False)) + finish_request(request) - def streams(self, writer, current_token): - request_token = parse_string(writer.request, "streams") + @defer.inlineCallbacks + def replicate(self, request_streams, limit): + writer = _Writer() + current_token = yield self.current_replication_token() + logger.info("Replicating up to %r", current_token) + + yield self.account_data(writer, current_token, limit, request_streams) + yield self.events(writer, current_token, limit, request_streams) + # TODO: implement limit + yield self.presence(writer, current_token, request_streams) + yield self.typing(writer, current_token, request_streams) + yield self.receipts(writer, current_token, limit, request_streams) + yield self.push_rules(writer, current_token, limit, request_streams) + yield self.pushers(writer, current_token, limit, request_streams) + yield self.state(writer, current_token, limit, request_streams) + self.streams(writer, current_token, request_streams) + + logger.info("Replicated %d rows", writer.total) + defer.returnValue(writer.finish()) + + def streams(self, writer, current_token, request_streams): + request_token = request_streams.get("streams") streams = [] @@ -191,32 +219,43 @@ class ReplicationResource(Resource): ) @defer.inlineCallbacks - def events(self, writer, current_token, limit): - request_events = parse_integer(writer.request, "events") - request_backfill = parse_integer(writer.request, "backfill") + def events(self, writer, current_token, limit, request_streams): + request_events = request_streams.get("events") + request_backfill = request_streams.get("backfill") if request_events is not None or request_backfill is not None: if request_events is None: request_events = current_token.events if request_backfill is None: request_backfill = current_token.backfill - events_rows, backfill_rows = yield self.store.get_all_new_events( + res = yield self.store.get_all_new_events( request_backfill, request_events, current_token.backfill, current_token.events, limit ) + writer.write_header_and_rows("events", res.new_forward_events, ( + "position", "internal", "json", "state_group" + )) + writer.write_header_and_rows("backfill", res.new_backfill_events, ( + "position", "internal", "json", "state_group" + )) + writer.write_header_and_rows( + "forward_ex_outliers", res.forward_ex_outliers, + ("position", "event_id", "state_group") + ) writer.write_header_and_rows( - "events", events_rows, ("position", "internal", "json") + "backward_ex_outliers", res.backward_ex_outliers, + ("position", "event_id", "state_group") ) writer.write_header_and_rows( - "backfill", backfill_rows, ("position", "internal", "json") + "state_resets", res.state_resets, ("position",) ) @defer.inlineCallbacks - def presence(self, writer, current_token): + def presence(self, writer, current_token, request_streams): current_position = current_token.presence - request_presence = parse_integer(writer.request, "presence") + request_presence = request_streams.get("presence") if request_presence is not None: presence_rows = yield self.presence_handler.get_all_presence_updates( @@ -229,10 +268,10 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def typing(self, writer, current_token): + def typing(self, writer, current_token, request_streams): current_position = current_token.presence - request_typing = parse_integer(writer.request, "typing") + request_typing = request_streams.get("typing") if request_typing is not None: typing_rows = yield self.typing_handler.get_all_typing_updates( @@ -243,10 +282,10 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def receipts(self, writer, current_token, limit): + def receipts(self, writer, current_token, limit, request_streams): current_position = current_token.receipts - request_receipts = parse_integer(writer.request, "receipts") + request_receipts = request_streams.get("receipts") if request_receipts is not None: receipts_rows = yield self.store.get_all_updated_receipts( @@ -257,12 +296,12 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def account_data(self, writer, current_token, limit): + def account_data(self, writer, current_token, limit, request_streams): current_position = current_token.account_data - user_account_data = parse_integer(writer.request, "user_account_data") - room_account_data = parse_integer(writer.request, "room_account_data") - tag_account_data = parse_integer(writer.request, "tag_account_data") + user_account_data = request_streams.get("user_account_data") + room_account_data = request_streams.get("room_account_data") + tag_account_data = request_streams.get("tag_account_data") if user_account_data is not None or room_account_data is not None: if user_account_data is None: @@ -288,10 +327,10 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def push_rules(self, writer, current_token, limit): + def push_rules(self, writer, current_token, limit, request_streams): current_position = current_token.push_rules - push_rules = parse_integer(writer.request, "push_rules") + push_rules = request_streams.get("push_rules") if push_rules is not None: rows = yield self.store.get_all_push_rule_updates( @@ -303,10 +342,11 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def pushers(self, writer, current_token, limit): + def pushers(self, writer, current_token, limit, request_streams): current_position = current_token.pushers - pushers = parse_integer(writer.request, "pushers") + pushers = request_streams.get("pushers") + if pushers is not None: updated, deleted = yield self.store.get_all_updated_pushers( pushers, current_position, limit @@ -316,16 +356,34 @@ class ReplicationResource(Resource): "app_id", "app_display_name", "device_display_name", "pushkey", "ts", "lang", "data" )) - writer.write_header_and_rows("deleted", deleted, ( + writer.write_header_and_rows("deleted_pushers", deleted, ( "position", "user_id", "app_id", "pushkey" )) + @defer.inlineCallbacks + def state(self, writer, current_token, limit, request_streams): + current_position = current_token.state + + state = request_streams.get("state") + + if state is not None: + state_groups, state_group_state = ( + yield self.store.get_all_new_state_groups( + state, current_position, limit + ) + ) + writer.write_header_and_rows("state_groups", state_groups, ( + "position", "room_id", "event_id" + )) + writer.write_header_and_rows("state_group_state", state_group_state, ( + "position", "type", "state_key", "event_id" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" - def __init__(self, request): + def __init__(self): self.streams = {} - self.request = request self.total = 0 def write_header_and_rows(self, name, rows, fields, position=None): @@ -336,7 +394,7 @@ class _Writer(object): position = rows[-1][0] self.streams[name] = { - "position": str(position), + "position": position if type(position) is int else str(position), "field_names": fields, "rows": rows, } @@ -344,13 +402,12 @@ class _Writer(object): self.total += len(rows) def finish(self): - self.request.write(json.dumps(self.streams, ensure_ascii=False)) - finish_request(self.request) + return self.streams class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules", "pushers" + "push_rules", "pushers", "state" ))): __slots__ = [] |