From 60a0f81c7a2da86bf959227a440e3f7a2b727bb5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 1 Mar 2016 14:49:41 +0000 Subject: Add a /replication API for extracting the updates that happened on synapse This is necessary for replicating the data in synapse to be visible to a separate service because presence and typing notifications aren't stored in a database so won't be visible to another process. This API can be used to either get the raw data by requesting the tables themselves or to just receive notifications for updates by following the streams meta-stream. Returns updates for each table requested a JSON array of arrays with a row for each row in the table. Each table is prefixed by a header row with the: name of the table, current stream_id position for the table, number of rows, number of columns and the names of the columns. This is followed by the rows that have been added to the server since the requester last asked. The API has a timeout and is hooked up to the notifier so that a slave can long poll for updates. --- synapse/replication/resource.py | 320 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 320 insertions(+) create mode 100644 synapse/replication/resource.py (limited to 'synapse/replication/resource.py') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py new file mode 100644 index 0000000000..e0d039518d --- /dev/null +++ b/synapse/replication/resource.py @@ -0,0 +1,320 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.servlet import parse_integer, parse_string +from synapse.http.server import request_handler, finish_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + +import ujson as json + +import collections +import logging + +logger = logging.getLogger(__name__) + +REPLICATION_PREFIX = "/_synapse/replication" + +STREAM_NAMES = ( + ("events",), + ("presence",), + ("typing",), + ("receipts",), + ("user_account_data", "room_account_data", "tag_account_data",), + ("backfill",), +) + + +class ReplicationResource(Resource): + """ + HTTP endpoint for extracting data from synapse. + + The streams of data returned by the endpoint are controlled by the + parameters given to the API. To return a given stream pass a query + parameter with a position in the stream to return data from or the + special value "-1" to return data from the start of the stream. + + If there is no data for any of the supplied streams after the given + position then the request will block until there is data for one + of the streams. This allows clients to long-poll this API. + + The possible streams are: + + * "streams": A special stream returing the positions of other streams. + * "events": The new events seen on the server. + * "presence": Presence updates. + * "typing": Typing updates. + * "receipts": Receipt updates. + * "user_account_data": Top-level per user account data. + * "room_account_data: Per room per user account data. + * "tag_account_data": Per room per user tags. + * "backfill": Old events that have been backfilled from other servers. + + The API takes two additional query parameters: + + * "timeout": How long to wait before returning an empty response. + * "limit": The maximum number of rows to return for the selected streams. + + The response is a JSON object with keys for each stream with updates. Under + each key is a JSON object with: + + * "postion": The current position of the stream. + * "field_names": The names of the fields in each row. + * "rows": The updates as an array of arrays. + + There are a number of ways this API could be used: + + 1) To replicate the contents of the backing database to another database. + 2) To be notified when the contents of a shared backing database changes. + 3) To "tail" the activity happening on a server for debugging. + + In the first case the client would track all of the streams and store it's + own copy of the data. + + In the second case the client might theoretically just be able to follow + the "streams" stream to track where the other streams are. However in + practise it will probably need to get the contents of the streams in + order to expire the any in-memory caches. Whether it gets the contents + of the streams from this replication API or directly from the backing + store is a matter of taste. + + In the third case the client would use the "streams" stream to find what + streams are available and their current positions. Then it can start + long-polling this replication API for new data on those streams. + """ + + isLeaf = True + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.version_string = hs.version_string + self.store = hs.get_datastore() + self.sources = hs.get_event_sources() + self.presence_handler = hs.get_handlers().presence_handler + self.typing_handler = hs.get_handlers().typing_notification_handler + self.notifier = hs.notifier + + def render_GET(self, request): + self._async_render_GET(request) + return NOT_DONE_YET + + @defer.inlineCallbacks + def current_replication_token(self): + stream_token = yield self.sources.get_current_token() + backfill_token = yield self.store.get_current_backfill_token() + + defer.returnValue(_ReplicationToken( + stream_token.room_stream_id, + int(stream_token.presence_key), + int(stream_token.typing_key), + int(stream_token.receipt_key), + int(stream_token.account_data_key), + backfill_token, + )) + + @request_handler + @defer.inlineCallbacks + def _async_render_GET(self, request): + limit = parse_integer(request, "limit", 100) + timeout = parse_integer(request, "timeout", 10 * 1000) + + request.setHeader(b"Content-Type", b"application/json") + writer = _Writer(request) + + @defer.inlineCallbacks + def replicate(): + current_token = yield self.current_replication_token() + logger.info("Replicating up to %r", current_token) + + yield self.account_data(writer, current_token, limit) + yield self.events(writer, current_token, limit) + yield self.presence(writer, current_token) # TODO: implement limit + yield self.typing(writer, current_token) # TODO: implement limit + yield self.receipts(writer, current_token, limit) + self.streams(writer, current_token) + + logger.info("Replicated %d rows", writer.total) + defer.returnValue(writer.total) + + yield self.notifier.wait_for_replication(replicate, timeout) + + writer.finish() + + def streams(self, writer, current_token): + request_token = parse_string(writer.request, "streams") + + streams = [] + + if request_token is not None: + if request_token == "-1": + for names, position in zip(STREAM_NAMES, current_token): + streams.extend((name, position) for name in names) + else: + items = zip( + STREAM_NAMES, + current_token, + _ReplicationToken(request_token) + ) + for names, current_id, last_id in items: + if last_id < current_id: + streams.extend((name, current_id) for name in names) + + if streams: + writer.write_header_and_rows( + "streams", streams, ("name", "position"), + position=str(current_token) + ) + + @defer.inlineCallbacks + def events(self, writer, current_token, limit): + request_events = parse_integer(writer.request, "events") + request_backfill = parse_integer(writer.request, "backfill") + + if request_events is not None or request_backfill is not None: + if request_events is None: + request_events = current_token.events + if request_backfill is None: + request_backfill = current_token.backfill + events_rows, backfill_rows = yield self.store.get_all_new_events( + request_backfill, request_events, + current_token.backfill, current_token.events, + limit + ) + writer.write_header_and_rows( + "events", events_rows, ("position", "internal", "json") + ) + writer.write_header_and_rows( + "backfill", backfill_rows, ("position", "internal", "json") + ) + + @defer.inlineCallbacks + def presence(self, writer, current_token): + current_position = current_token.presence + + request_presence = parse_integer(writer.request, "presence") + + if request_presence is not None: + presence_rows = yield self.presence_handler.get_all_presence_updates( + request_presence, current_position + ) + writer.write_header_and_rows("presence", presence_rows, ( + "position", "user_id", "state", "last_active_ts", + "last_federation_update_ts", "last_user_sync_ts", + "status_msg", "currently_active", + )) + + @defer.inlineCallbacks + def typing(self, writer, current_token): + current_position = current_token.presence + + request_typing = parse_integer(writer.request, "typing") + + if request_typing is not None: + typing_rows = yield self.typing_handler.get_all_typing_updates( + request_typing, current_position + ) + writer.write_header_and_rows("typing", typing_rows, ( + "position", "room_id", "typing" + )) + + @defer.inlineCallbacks + def receipts(self, writer, current_token, limit): + current_position = current_token.receipts + + request_receipts = parse_integer(writer.request, "receipts") + + if request_receipts is not None: + receipts_rows = yield self.store.get_all_updated_receipts( + request_receipts, current_position, limit + ) + writer.write_header_and_rows("receipts", receipts_rows, ( + "position", "room_id", "receipt_type", "user_id", "event_id", "data" + )) + + @defer.inlineCallbacks + def account_data(self, writer, current_token, limit): + current_position = current_token.account_data + + user_account_data = parse_integer(writer.request, "user_account_data") + room_account_data = parse_integer(writer.request, "room_account_data") + tag_account_data = parse_integer(writer.request, "tag_account_data") + + if user_account_data is not None or room_account_data is not None: + if user_account_data is None: + user_account_data = current_position + if room_account_data is None: + room_account_data = current_position + user_rows, room_rows = yield self.store.get_all_updated_account_data( + user_account_data, room_account_data, current_position, limit + ) + writer.write_header_and_rows("user_account_data", user_rows, ( + "position", "user_id", "type", "content" + )) + writer.write_header_and_rows("room_account_data", room_rows, ( + "position", "user_id", "room_id", "type", "content" + )) + + if tag_account_data is not None: + tag_rows = yield self.store.get_all_updated_tags( + tag_account_data, current_position, limit + ) + writer.write_header_and_rows("tag_account_data", tag_rows, ( + "position", "user_id", "room_id", "tags" + )) + + +class _Writer(object): + """Writes the streams as a JSON object as the response to the request""" + def __init__(self, request): + self.streams = {} + self.request = request + self.total = 0 + + def write_header_and_rows(self, name, rows, fields, position=None): + if not rows: + return + + if position is None: + position = rows[-1][0] + + self.streams[name] = { + "position": str(position), + "field_names": fields, + "rows": rows, + } + + self.total += len(rows) + + def finish(self): + self.request.write(json.dumps(self.streams, ensure_ascii=False)) + finish_request(self.request) + + +class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( + "events", "presence", "typing", "receipts", "account_data", "backfill", +))): + __slots__ = [] + + def __new__(cls, *args): + if len(args) == 1: + return cls(*(int(value) for value in args[0].split("_"))) + else: + return super(_ReplicationToken, cls).__new__(cls, *args) + + def __str__(self): + return "_".join(str(value) for value in self) -- cgit 1.5.1 From 2223204ebaf7624f4d640f2c56d3a4eb7ff6d98e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 2 Mar 2016 17:26:20 +0000 Subject: Hook push rules up to the replication API --- synapse/replication/resource.py | 28 ++++++++++++++++++++++++++-- synapse/storage/push_rule.py | 6 ++++++ tests/replication/test_resource.py | 6 ++++-- 3 files changed, 36 insertions(+), 4 deletions(-) (limited to 'synapse/replication/resource.py') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index e0d039518d..15b7898a45 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -36,6 +36,7 @@ STREAM_NAMES = ( ("receipts",), ("user_account_data", "room_account_data", "tag_account_data",), ("backfill",), + ("push_rules",), ) @@ -63,6 +64,7 @@ class ReplicationResource(Resource): * "room_account_data: Per room per user account data. * "tag_account_data": Per room per user tags. * "backfill": Old events that have been backfilled from other servers. + * "push_rules": Per user changes to push rules. The API takes two additional query parameters: @@ -117,14 +119,16 @@ class ReplicationResource(Resource): def current_replication_token(self): stream_token = yield self.sources.get_current_token() backfill_token = yield self.store.get_current_backfill_token() + push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() defer.returnValue(_ReplicationToken( - stream_token.room_stream_id, + room_stream_token, int(stream_token.presence_key), int(stream_token.typing_key), int(stream_token.receipt_key), int(stream_token.account_data_key), backfill_token, + push_rules_token, )) @request_handler @@ -146,6 +150,7 @@ class ReplicationResource(Resource): yield self.presence(writer, current_token) # TODO: implement limit yield self.typing(writer, current_token) # TODO: implement limit yield self.receipts(writer, current_token, limit) + yield self.push_rules(writer, current_token, limit) self.streams(writer, current_token) logger.info("Replicated %d rows", writer.total) @@ -277,6 +282,21 @@ class ReplicationResource(Resource): "position", "user_id", "room_id", "tags" )) + @defer.inlineCallbacks + def push_rules(self, writer, current_token, limit): + current_position = current_token.push_rules + + push_rules = parse_integer(writer.request, "push_rules") + + if push_rules is not None: + rows = yield self.store.get_all_push_rule_updates( + push_rules, current_position, limit + ) + writer.write_header_and_rows("push_rules", rows, ( + "position", "stream_ordering", "user_id", "rule_id", "op", + "priority_class", "priority", "conditions", "actions" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -307,12 +327,16 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", + "push_rules" ))): __slots__ = [] def __new__(cls, *args): if len(args) == 1: - return cls(*(int(value) for value in args[0].split("_"))) + streams = [int(value) for value in args[0].split("_")] + if len(streams) < len(cls._fields): + streams.extend([0] * (len(cls._fields) - len(streams))) + return cls(*streams) else: return super(_ReplicationToken, cls).__new__(cls, *args) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index f3ebd49492..e034024108 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -412,6 +412,12 @@ class PushRuleStore(SQLBaseStore): "get_all_push_rule_updates", get_all_push_rule_updates_txn ) + def get_push_rules_stream_token(self): + """Get the position of the push rules stream. + Returns a pair of a stream id for the push_rules stream and the + room stream ordering it corresponds to.""" + return self._push_rules_stream_id_gen.get_max_token() + class RuleNotFoundException(Exception): pass diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index 38daaf87e2..a30d59a865 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -35,7 +35,8 @@ class ReplicationResourceCase(unittest.TestCase): "send_message", ]), ) - self.user = UserID.from_string("@seeing:red") + self.user_id = "@seeing:red" + self.user = UserID.from_string(self.user_id) self.hs.get_ratelimiter().send_message.return_value = (True, 0) @@ -101,7 +102,7 @@ class ReplicationResourceCase(unittest.TestCase): event_id = yield self.send_text_message(room_id, "Hello, World") get = self.get(receipts="-1") yield self.hs.get_handlers().receipts_handler.received_client_receipt( - room_id, "m.read", self.user.to_string(), event_id + room_id, "m.read", self.user_id, event_id ) code, body = yield get self.assertEquals(code, 200) @@ -129,6 +130,7 @@ class ReplicationResourceCase(unittest.TestCase): test_timeout_room_account_data = _test_timeout("room_account_data") test_timeout_tag_account_data = _test_timeout("tag_account_data") test_timeout_backfill = _test_timeout("backfill") + test_timeout_push_rules = _test_timeout("push_rules") @defer.inlineCallbacks def send_text_message(self, room_id, message): -- cgit 1.5.1 From ebcbb23226904f080e6a9c1e2f2901886c286445 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 4 Mar 2016 16:15:23 +0000 Subject: s/stream_ordering/event_stream_ordering/ in push --- synapse/replication/resource.py | 2 +- synapse/storage/push_rule.py | 54 ++++++++++++---------- .../storage/schema/delta/30/push_rule_stream.sql | 2 +- 3 files changed, 31 insertions(+), 27 deletions(-) (limited to 'synapse/replication/resource.py') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 15b7898a45..adc1eb1d0b 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -293,7 +293,7 @@ class ReplicationResource(Resource): push_rules, current_position, limit ) writer.write_header_and_rows("push_rules", rows, ( - "position", "stream_ordering", "user_id", "rule_id", "op", + "position", "event_stream_ordering", "user_id", "rule_id", "op", "priority_class", "priority", "conditions", "actions" )) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 57e1ca5509..9dbad2fd5f 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -106,24 +106,25 @@ class PushRuleStore(SQLBaseStore): ): conditions_json = json.dumps(conditions) actions_json = json.dumps(actions) - with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids if before or after: yield self.runInteraction( "_add_push_rule_relative_txn", self._add_push_rule_relative_txn, - stream_id, stream_ordering, user_id, rule_id, priority_class, + stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json, before, after, ) else: yield self.runInteraction( "_add_push_rule_highest_priority_txn", self._add_push_rule_highest_priority_txn, - stream_id, stream_ordering, user_id, rule_id, priority_class, + stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json, ) def _add_push_rule_relative_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json, before, after ): # Lock the table since otherwise we'll have annoying races between the @@ -175,12 +176,12 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_id, priority_class, new_rule_priority)) self._upsert_push_rule_txn( - txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, new_rule_priority, conditions_json, actions_json, ) def _add_push_rule_highest_priority_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json ): # Lock the table since otherwise we'll have annoying races between the @@ -202,12 +203,12 @@ class PushRuleStore(SQLBaseStore): self._upsert_push_rule_txn( txn, - stream_id, stream_ordering, user_id, rule_id, priority_class, new_prio, + stream_id, event_stream_ordering, user_id, rule_id, priority_class, new_prio, conditions_json, actions_json, ) def _upsert_push_rule_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, priority, conditions_json, actions_json, update_stream=True ): """Specialised version of _simple_upsert_txn that picks a push_rule_id @@ -245,7 +246,7 @@ class PushRuleStore(SQLBaseStore): if update_stream: self._insert_push_rules_update_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, op="ADD", data={ "priority_class": priority_class, @@ -266,7 +267,7 @@ class PushRuleStore(SQLBaseStore): user_id (str): The matrix ID of the push rule owner rule_id (str): The rule_id of the rule to be deleted """ - def delete_push_rule_txn(txn, stream_id, stream_ordering): + def delete_push_rule_txn(txn, stream_id, event_stream_ordering): self._simple_delete_one_txn( txn, "push_rules", @@ -274,26 +275,28 @@ class PushRuleStore(SQLBaseStore): ) self._insert_push_rules_update_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE" ) - with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids yield self.runInteraction( - "delete_push_rule", delete_push_rule_txn, stream_id, stream_ordering + "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering ) @defer.inlineCallbacks def set_push_rule_enabled(self, user_id, rule_id, enabled): - with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids yield self.runInteraction( "_set_push_rule_enabled_txn", self._set_push_rule_enabled_txn, - stream_id, stream_ordering, user_id, rule_id, enabled + stream_id, event_stream_ordering, user_id, rule_id, enabled ) def _set_push_rule_enabled_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, enabled + self, txn, stream_id, event_stream_ordering, user_id, rule_id, enabled ): new_id = self._push_rules_enable_id_gen.get_next() self._simple_upsert_txn( @@ -305,7 +308,7 @@ class PushRuleStore(SQLBaseStore): ) self._insert_push_rules_update_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, op="ENABLE" if enabled else "DISABLE" ) @@ -313,14 +316,14 @@ class PushRuleStore(SQLBaseStore): def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): actions_json = json.dumps(actions) - def set_push_rule_actions_txn(txn, stream_id, stream_ordering): + def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering): if is_default_rule: # Add a dummy rule to the rules table with the user specified # actions. priority_class = -1 priority = 1 self._upsert_push_rule_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, priority, "[]", actions_json, update_stream=False ) @@ -333,22 +336,23 @@ class PushRuleStore(SQLBaseStore): ) self._insert_push_rules_update_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, op="ACTIONS", data={"actions": actions_json} ) - with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids yield self.runInteraction( "set_push_rule_actions", set_push_rule_actions_txn, - stream_id, stream_ordering + stream_id, event_stream_ordering ) def _insert_push_rules_update_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, op, data=None + self, txn, stream_id, event_stream_ordering, user_id, rule_id, op, data=None ): values = { "stream_id": stream_id, - "stream_ordering": stream_ordering, + "event_stream_ordering": event_stream_ordering, "user_id": user_id, "rule_id": rule_id, "op": op, @@ -372,7 +376,7 @@ class PushRuleStore(SQLBaseStore): """Get all the push rules changes that have happend on the server""" def get_all_push_rule_updates_txn(txn): sql = ( - "SELECT stream_id, stream_ordering, user_id, rule_id," + "SELECT stream_id, event_stream_ordering, user_id, rule_id," " op, priority_class, priority, conditions, actions" " FROM push_rules_stream" " WHERE ? < stream_id AND stream_id <= ?" diff --git a/synapse/storage/schema/delta/30/push_rule_stream.sql b/synapse/storage/schema/delta/30/push_rule_stream.sql index e8418bb35f..735aa8d5f6 100644 --- a/synapse/storage/schema/delta/30/push_rule_stream.sql +++ b/synapse/storage/schema/delta/30/push_rule_stream.sql @@ -17,7 +17,7 @@ CREATE TABLE push_rules_stream( stream_id BIGINT NOT NULL, - stream_ordering BIGINT NOT NULL, + event_stream_ordering BIGINT NOT NULL, user_id TEXT NOT NULL, rule_id TEXT NOT NULL, op TEXT NOT NULL, -- One of "ENABLE", "DISABLE", "ACTIONS", "ADD", "DELETE" -- cgit 1.5.1 From b6e8420aeed9921ba7d0fd4c8ebaf1b64d5f677c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 15 Mar 2016 17:01:43 +0000 Subject: Add replication stream for pushers --- synapse/replication/resource.py | 25 ++++++++- synapse/storage/__init__.py | 5 +- synapse/storage/pusher.py | 63 ++++++++++++++++------ .../storage/schema/delta/30/deleted_pushers.sql | 24 +++++++++ synapse/storage/util/id_generators.py | 7 ++- tests/replication/test_resource.py | 1 + 6 files changed, 107 insertions(+), 18 deletions(-) create mode 100644 synapse/storage/schema/delta/30/deleted_pushers.sql (limited to 'synapse/replication/resource.py') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index adc1eb1d0b..8c1ae0fbc7 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -37,6 +37,7 @@ STREAM_NAMES = ( ("user_account_data", "room_account_data", "tag_account_data",), ("backfill",), ("push_rules",), + ("pushers",), ) @@ -65,6 +66,7 @@ class ReplicationResource(Resource): * "tag_account_data": Per room per user tags. * "backfill": Old events that have been backfilled from other servers. * "push_rules": Per user changes to push rules. + * "pushers": Per user changes to their pushers. The API takes two additional query parameters: @@ -120,6 +122,7 @@ class ReplicationResource(Resource): stream_token = yield self.sources.get_current_token() backfill_token = yield self.store.get_current_backfill_token() push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() + pushers_token = self.store.get_pushers_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -129,6 +132,7 @@ class ReplicationResource(Resource): int(stream_token.account_data_key), backfill_token, push_rules_token, + pushers_token, )) @request_handler @@ -151,6 +155,7 @@ class ReplicationResource(Resource): yield self.typing(writer, current_token) # TODO: implement limit yield self.receipts(writer, current_token, limit) yield self.push_rules(writer, current_token, limit) + yield self.pushers(writer, current_token, limit) self.streams(writer, current_token) logger.info("Replicated %d rows", writer.total) @@ -297,6 +302,24 @@ class ReplicationResource(Resource): "priority_class", "priority", "conditions", "actions" )) + @defer.inlineCallbacks + def pushers(self, writer, current_token, limit): + current_position = current_token.pushers + + pushers = parse_integer(writer.request, "pushers") + if pushers is not None: + updated, deleted = yield self.store.get_all_updated_pushers( + pushers, current_position, limit + ) + writer.write_header_and_rows("pushers", updated, ( + "position", "user_id", "access_token", "profile_tag", "kind", + "app_id", "app_display_name", "device_display_name", "pushkey", + "ts", "lang", "data" + )) + writer.write_header_and_rows("deleted", deleted, ( + "position", "user_id", "app_id", "pushkey" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -327,7 +350,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules" + "push_rules", "pushers" ))): __slots__ = [] diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 168eb27b03..250ba536ea 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -119,12 +119,15 @@ class DataStore(RoomMemberStore, RoomStore, self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") - self._pushers_id_gen = IdGenerator(db_conn, "pushers", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") self._push_rules_stream_id_gen = ChainedIdGenerator( self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" ) + self._pushers_id_gen = StreamIdGenerator( + db_conn, "pushers", "id", + extra_tables=[("deleted_pushers", "stream_id")], + ) events_max = self._stream_id_gen.get_max_token() event_cache_prefill, min_event_val = self._get_cache_dict( diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 7693ab9082..29da3bbd13 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -16,8 +16,6 @@ from ._base import SQLBaseStore from twisted.internet import defer -from synapse.api.errors import StoreError - from canonicaljson import encode_canonical_json import logging @@ -79,12 +77,41 @@ class PusherStore(SQLBaseStore): rows = yield self.runInteraction("get_all_pushers", get_pushers) defer.returnValue(rows) + def get_pushers_stream_token(self): + return self._pushers_id_gen.get_max_token() + + def get_all_updated_pushers(self, last_id, current_id, limit): + def get_all_updated_pushers_txn(txn): + sql = ( + "SELECT id, user_name, access_token, profile_tag, kind," + " app_id, app_display_name, device_display_name, pushkey, ts," + " lang, data" + " FROM pushers" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + updated = txn.fetchall() + + sql = ( + "SELECT stream_id, user_id, app_id, pushkey" + " FROM deleted_pushers" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + deleted = txn.fetchall() + + return (updated, deleted) + return self.runInteraction( + "get_all_updated_pushers", get_all_updated_pushers_txn + ) + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, lang, data, profile_tag=""): - try: - next_id = self._pushers_id_gen.get_next() + with self._pushers_id_gen.get_next() as stream_id: yield self._simple_upsert( "pushers", dict( @@ -101,23 +128,29 @@ class PusherStore(SQLBaseStore): lang=lang, data=encode_canonical_json(data), profile_tag=profile_tag, - ), - insertion_values=dict( - id=next_id, + id=stream_id, ), desc="add_pusher", ) - except Exception as e: - logger.error("create_pusher with failed: %s", e) - raise StoreError(500, "Problem creating pusher.") @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): - yield self._simple_delete_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, 'user_name': user_id}, - desc="delete_pusher_by_app_id_pushkey_user_id", - ) + def delete_pusher_txn(txn, stream_id): + self._simple_delete_one( + txn, + "pushers", + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id} + ) + self._simple_upsert_txn( + txn, + "deleted_pushers", + {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, + {"stream_id", stream_id}, + ) + with self._pushers_id_gen.get_next() as stream_id: + yield self.runInteraction( + "delete_pusher", delete_pusher_txn, stream_id + ) @defer.inlineCallbacks def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): diff --git a/synapse/storage/schema/delta/30/deleted_pushers.sql b/synapse/storage/schema/delta/30/deleted_pushers.sql new file mode 100644 index 0000000000..cdcf79ac81 --- /dev/null +++ b/synapse/storage/schema/delta/30/deleted_pushers.sql @@ -0,0 +1,24 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS deleted_pushers( + stream_id BIGINT NOT NULL, + app_id TEXT NOT NULL, + pushkey TEXT NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (app_id, pushkey, user_id) +); + +CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 610ddad423..a02dfc7d58 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -49,9 +49,14 @@ class StreamIdGenerator(object): with stream_id_gen.get_next() as stream_id: # ... persist event ... """ - def __init__(self, db_conn, table, column): + def __init__(self, db_conn, table, column, extra_tables=[]): self._lock = threading.Lock() self._current_max = _load_max_id(db_conn, table, column) + for table, column in extra_tables: + self._current_max = max( + self._current_max, + _load_max_id(db_conn, table, column) + ) self._unfinished_ids = deque() def get_next(self): diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index 4a42eb3365..f4b5fb3328 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -131,6 +131,7 @@ class ReplicationResourceCase(unittest.TestCase): test_timeout_tag_account_data = _test_timeout("tag_account_data") test_timeout_backfill = _test_timeout("backfill") test_timeout_push_rules = _test_timeout("push_rules") + test_timeout_pushers = _test_timeout("pushers") @defer.inlineCallbacks def send_text_message(self, room_id, message): -- cgit 1.5.1 From adafa24b0a8f539c114c7d45f36f7b62743557f6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 25 Mar 2016 23:38:19 +0000 Subject: typo --- synapse/replication/resource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/replication/resource.py') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 8c1ae0fbc7..37a1d3960c 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -76,7 +76,7 @@ class ReplicationResource(Resource): The response is a JSON object with keys for each stream with updates. Under each key is a JSON object with: - * "postion": The current position of the stream. + * "position": The current position of the stream. * "field_names": The names of the fields in each row. * "rows": The updates as an array of arrays. -- cgit 1.5.1