summary refs log tree commit diff
path: root/synapse/replication/resource.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-09-23 15:31:47 +0100
committerErik Johnston <erik@matrix.org>2016-09-23 16:49:21 +0100
commit748d8fdc7bcdb43719e99a48cc74bf078f22396f (patch)
tree19ea8ddc1692448cb2818ecd42b5512014e12caa /synapse/replication/resource.py
parentMerge pull request #1136 from matrix-org/erikj/fix_signed_3pid (diff)
downloadsynapse-748d8fdc7bcdb43719e99a48cc74bf078f22396f.tar.xz
Reduce DB hits for replication
Some streams will occaisonally advance their positions without actually
having any new rows to send over federation. Currently this means that
the token will not advance on the workers, leading to them repeatedly
sending a slightly out of date token. This in turns requires the master
to hit the DB to check if there are any new rows, rather than hitting
the no op logic where we check if the given token matches the current
token.

This commit changes the API to always return an entry if the position
for a stream has changed, allowing workers to advance their tokens
correctly.
Diffstat (limited to '')
-rw-r--r--synapse/replication/resource.py139
1 files changed, 103 insertions, 36 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 9aab3ce23c..585bd1c4ad 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -17,6 +17,7 @@ from synapse.http.servlet import parse_integer, parse_string
 from synapse.http.server import request_handler, finish_request
 from synapse.replication.pusher_resource import PusherResource
 from synapse.replication.presence_resource import PresenceResource
+from synapse.api.errors import SynapseError
 
 from twisted.web.resource import Resource
 from twisted.web.server import NOT_DONE_YET
@@ -166,7 +167,8 @@ class ReplicationResource(Resource):
         def replicate():
             return self.replicate(request_streams, limit)
 
-        result = yield self.notifier.wait_for_replication(replicate, timeout)
+        writer = yield self.notifier.wait_for_replication(replicate, timeout)
+        result = writer.finish()
 
         for stream_name, stream_content in result.items():
             logger.info(
@@ -186,6 +188,9 @@ class ReplicationResource(Resource):
         current_token = yield self.current_replication_token()
         logger.debug("Replicating up to %r", current_token)
 
+        if limit == 0:
+            raise SynapseError(400, "Limit cannot be 0")
+
         yield self.account_data(writer, current_token, limit, request_streams)
         yield self.events(writer, current_token, limit, request_streams)
         # TODO: implement limit
@@ -200,7 +205,7 @@ class ReplicationResource(Resource):
         self.streams(writer, current_token, request_streams)
 
         logger.debug("Replicated %d rows", writer.total)
-        defer.returnValue(writer.finish())
+        defer.returnValue(writer)
 
     def streams(self, writer, current_token, request_streams):
         request_token = request_streams.get("streams")
@@ -233,31 +238,52 @@ class ReplicationResource(Resource):
         request_backfill = request_streams.get("backfill")
 
         if request_events is not None or request_backfill is not None:
-            if request_events is None:
+            if request_backfill is None:
                 request_events = current_token.events
             if request_backfill is None:
                 request_backfill = current_token.backfill
+
+            no_new_tokens = (
+                request_events == current_token.events
+                and request_backfill == current_token.backfill
+            )
+            if no_new_tokens:
+                return
+
             res = yield self.store.get_all_new_events(
                 request_backfill, request_events,
                 current_token.backfill, current_token.events,
                 limit
             )
-            writer.write_header_and_rows("events", res.new_forward_events, (
-                "position", "internal", "json", "state_group"
-            ))
-            writer.write_header_and_rows("backfill", res.new_backfill_events, (
-                "position", "internal", "json", "state_group"
-            ))
+
+            upto_events_token = _position_from_rows(
+                res.new_forward_events, current_token.events
+            )
+
+            upto_backfill_token = _position_from_rows(
+                res.new_backfill_events, current_token.backfill
+            )
+
+            if request_events != upto_events_token:
+                writer.write_header_and_rows("events", res.new_forward_events, (
+                    "position", "internal", "json", "state_group"
+                ), position=upto_events_token)
+
+            if request_backfill != upto_backfill_token:
+                writer.write_header_and_rows("backfill", res.new_backfill_events, (
+                    "position", "internal", "json", "state_group",
+                ), position=upto_backfill_token)
+
             writer.write_header_and_rows(
                 "forward_ex_outliers", res.forward_ex_outliers,
-                ("position", "event_id", "state_group")
+                ("position", "event_id", "state_group"),
             )
             writer.write_header_and_rows(
                 "backward_ex_outliers", res.backward_ex_outliers,
-                ("position", "event_id", "state_group")
+                ("position", "event_id", "state_group"),
             )
             writer.write_header_and_rows(
-                "state_resets", res.state_resets, ("position",)
+                "state_resets", res.state_resets, ("position",),
             )
 
     @defer.inlineCallbacks
@@ -266,15 +292,16 @@ class ReplicationResource(Resource):
 
         request_presence = request_streams.get("presence")
 
-        if request_presence is not None:
+        if request_presence is not None and request_presence != current_position:
             presence_rows = yield self.presence_handler.get_all_presence_updates(
                 request_presence, current_position
             )
+            upto_token = _position_from_rows(presence_rows, current_position)
             writer.write_header_and_rows("presence", presence_rows, (
                 "position", "user_id", "state", "last_active_ts",
                 "last_federation_update_ts", "last_user_sync_ts",
                 "status_msg", "currently_active",
-            ))
+            ), position=upto_token)
 
     @defer.inlineCallbacks
     def typing(self, writer, current_token, request_streams):
@@ -282,7 +309,7 @@ class ReplicationResource(Resource):
 
         request_typing = request_streams.get("typing")
 
-        if request_typing is not None:
+        if request_typing is not None and request_typing != current_position:
             # If they have a higher token than current max, we can assume that
             # they had been talking to a previous instance of the master. Since
             # we reset the token on restart, the best (but hacky) thing we can
@@ -293,9 +320,10 @@ class ReplicationResource(Resource):
             typing_rows = yield self.typing_handler.get_all_typing_updates(
                 request_typing, current_position
             )
+            upto_token = _position_from_rows(typing_rows, current_position)
             writer.write_header_and_rows("typing", typing_rows, (
                 "position", "room_id", "typing"
-            ))
+            ), position=upto_token)
 
     @defer.inlineCallbacks
     def receipts(self, writer, current_token, limit, request_streams):
@@ -303,13 +331,14 @@ class ReplicationResource(Resource):
 
         request_receipts = request_streams.get("receipts")
 
-        if request_receipts is not None:
+        if request_receipts is not None and request_receipts != current_position:
             receipts_rows = yield self.store.get_all_updated_receipts(
                 request_receipts, current_position, limit
             )
+            upto_token = _position_from_rows(receipts_rows, current_position)
             writer.write_header_and_rows("receipts", receipts_rows, (
                 "position", "room_id", "receipt_type", "user_id", "event_id", "data"
-            ))
+            ), position=upto_token)
 
     @defer.inlineCallbacks
     def account_data(self, writer, current_token, limit, request_streams):
@@ -324,23 +353,36 @@ class ReplicationResource(Resource):
                 user_account_data = current_position
             if room_account_data is None:
                 room_account_data = current_position
+
+            no_new_tokens = (
+                user_account_data == current_position
+                and room_account_data == current_position
+            )
+            if no_new_tokens:
+                return
+
             user_rows, room_rows = yield self.store.get_all_updated_account_data(
                 user_account_data, room_account_data, current_position, limit
             )
+
+            upto_users_token = _position_from_rows(user_rows, current_position)
+            upto_rooms_token = _position_from_rows(room_rows, current_position)
+
             writer.write_header_and_rows("user_account_data", user_rows, (
                 "position", "user_id", "type", "content"
-            ))
+            ), position=upto_users_token)
             writer.write_header_and_rows("room_account_data", room_rows, (
                 "position", "user_id", "room_id", "type", "content"
-            ))
+            ), position=upto_rooms_token)
 
         if tag_account_data is not None:
             tag_rows = yield self.store.get_all_updated_tags(
                 tag_account_data, current_position, limit
             )
+            upto_tag_token = _position_from_rows(tag_rows, current_position)
             writer.write_header_and_rows("tag_account_data", tag_rows, (
                 "position", "user_id", "room_id", "tags"
-            ))
+            ), position=upto_tag_token)
 
     @defer.inlineCallbacks
     def push_rules(self, writer, current_token, limit, request_streams):
@@ -348,14 +390,15 @@ class ReplicationResource(Resource):
 
         push_rules = request_streams.get("push_rules")
 
-        if push_rules is not None:
+        if push_rules is not None and push_rules != current_position:
             rows = yield self.store.get_all_push_rule_updates(
                 push_rules, current_position, limit
             )
+            upto_token = _position_from_rows(rows, current_position)
             writer.write_header_and_rows("push_rules", rows, (
                 "position", "event_stream_ordering", "user_id", "rule_id", "op",
                 "priority_class", "priority", "conditions", "actions"
-            ))
+            ), position=upto_token)
 
     @defer.inlineCallbacks
     def pushers(self, writer, current_token, limit, request_streams):
@@ -363,18 +406,19 @@ class ReplicationResource(Resource):
 
         pushers = request_streams.get("pushers")
 
-        if pushers is not None:
+        if pushers is not None and pushers != current_position:
             updated, deleted = yield self.store.get_all_updated_pushers(
                 pushers, current_position, limit
             )
+            upto_token = _position_from_rows(updated, current_position)
             writer.write_header_and_rows("pushers", updated, (
                 "position", "user_id", "access_token", "profile_tag", "kind",
                 "app_id", "app_display_name", "device_display_name", "pushkey",
                 "ts", "lang", "data"
-            ))
+            ), position=upto_token)
             writer.write_header_and_rows("deleted_pushers", deleted, (
                 "position", "user_id", "app_id", "pushkey"
-            ))
+            ), position=upto_token)
 
     @defer.inlineCallbacks
     def caches(self, writer, current_token, limit, request_streams):
@@ -382,13 +426,14 @@ class ReplicationResource(Resource):
 
         caches = request_streams.get("caches")
 
-        if caches is not None:
+        if caches is not None and caches != current_position:
             updated_caches = yield self.store.get_all_updated_caches(
                 caches, current_position, limit
             )
+            upto_token = _position_from_rows(updated_caches, current_position)
             writer.write_header_and_rows("caches", updated_caches, (
                 "position", "cache_func", "keys", "invalidation_ts"
-            ))
+            ), position=upto_token)
 
     @defer.inlineCallbacks
     def to_device(self, writer, current_token, limit, request_streams):
@@ -396,13 +441,14 @@ class ReplicationResource(Resource):
 
         to_device = request_streams.get("to_device")
 
-        if to_device is not None:
+        if to_device is not None and to_device != current_position:
             to_device_rows = yield self.store.get_all_new_device_messages(
                 to_device, current_position, limit
             )
+            upto_token = _position_from_rows(to_device_rows, current_position)
             writer.write_header_and_rows("to_device", to_device_rows, (
                 "position", "user_id", "device_id", "message_json"
-            ))
+            ), position=upto_token)
 
     @defer.inlineCallbacks
     def public_rooms(self, writer, current_token, limit, request_streams):
@@ -410,13 +456,14 @@ class ReplicationResource(Resource):
 
         public_rooms = request_streams.get("public_rooms")
 
-        if public_rooms is not None:
+        if public_rooms is not None and public_rooms != current_position:
             public_rooms_rows = yield self.store.get_all_new_public_rooms(
                 public_rooms, current_position, limit
             )
+            upto_token = _position_from_rows(public_rooms_rows, current_position)
             writer.write_header_and_rows("public_rooms", public_rooms_rows, (
                 "position", "room_id", "visibility"
-            ))
+            ), position=upto_token)
 
 
 class _Writer(object):
@@ -426,11 +473,11 @@ class _Writer(object):
         self.total = 0
 
     def write_header_and_rows(self, name, rows, fields, position=None):
-        if not rows:
-            return
-
         if position is None:
-            position = rows[-1][0]
+            if rows:
+                position = rows[-1][0]
+            else:
+                return
 
         self.streams[name] = {
             "position": position if type(position) is int else str(position),
@@ -440,6 +487,9 @@ class _Writer(object):
 
         self.total += len(rows)
 
+    def __nonzero__(self):
+        return bool(self.total)
+
     def finish(self):
         return self.streams
 
@@ -461,3 +511,20 @@ class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
 
     def __str__(self):
         return "_".join(str(value) for value in self)
+
+
+def _position_from_rows(rows, current_position):
+    """Calculates a position to return for a stream. Ideally we want to return the
+    position of the last row, as that will be the most correct. However, if there
+    are no rows we fall back to using the current position to stop us from
+    repeatedly hitting the storage layer unncessarily thinking there are updates.
+    (Not all advances of the token correspond to an actual update)
+
+    We can't just always return the current position, as we often limit the
+    number of rows we replicate, and so the stream may lag. The assumption is
+    that if the storage layer returns no new rows then we are not lagging and
+    we are at the `current_position`.
+    """
+    if rows:
+        return rows[-1][0]
+    return current_position