summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/resource.py99
-rw-r--r--synapse/replication/slave/__init__.py14
-rw-r--r--synapse/replication/slave/storage/__init__.py14
-rw-r--r--synapse/replication/slave/storage/_base.py28
-rw-r--r--synapse/replication/slave/storage/_slaved_id_tracker.py30
-rw-r--r--synapse/replication/slave/storage/events.py199
6 files changed, 340 insertions, 44 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index c51a6fa103..a543af68f8 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -145,32 +145,43 @@ class ReplicationResource(Resource):
         timeout = parse_integer(request, "timeout", 10 * 1000)
 
         request.setHeader(b"Content-Type", b"application/json")
-        writer = _Writer(request)
 
-        @defer.inlineCallbacks
-        def replicate():
-            current_token = yield self.current_replication_token()
-            logger.info("Replicating up to %r", current_token)
-
-            yield self.account_data(writer, current_token, limit)
-            yield self.events(writer, current_token, limit)
-            yield self.presence(writer, current_token)  # TODO: implement limit
-            yield self.typing(writer, current_token)  # TODO: implement limit
-            yield self.receipts(writer, current_token, limit)
-            yield self.push_rules(writer, current_token, limit)
-            yield self.pushers(writer, current_token, limit)
-            yield self.state(writer, current_token, limit)
-            self.streams(writer, current_token)
+        request_streams = {
+            name: parse_integer(request, name)
+            for names in STREAM_NAMES for name in names
+        }
+        request_streams["streams"] = parse_string(request, "streams")
 
-            logger.info("Replicated %d rows", writer.total)
-            defer.returnValue(writer.total)
+        def replicate():
+            return self.replicate(request_streams, limit)
 
-        yield self.notifier.wait_for_replication(replicate, timeout)
+        result = yield self.notifier.wait_for_replication(replicate, timeout)
 
-        writer.finish()
+        request.write(json.dumps(result, ensure_ascii=False))
+        finish_request(request)
 
-    def streams(self, writer, current_token):
-        request_token = parse_string(writer.request, "streams")
+    @defer.inlineCallbacks
+    def replicate(self, request_streams, limit):
+        writer = _Writer()
+        current_token = yield self.current_replication_token()
+        logger.info("Replicating up to %r", current_token)
+
+        yield self.account_data(writer, current_token, limit, request_streams)
+        yield self.events(writer, current_token, limit, request_streams)
+        # TODO: implement limit
+        yield self.presence(writer, current_token, request_streams)
+        yield self.typing(writer, current_token, request_streams)
+        yield self.receipts(writer, current_token, limit, request_streams)
+        yield self.push_rules(writer, current_token, limit, request_streams)
+        yield self.pushers(writer, current_token, limit, request_streams)
+        yield self.state(writer, current_token, limit, request_streams)
+        self.streams(writer, current_token, request_streams)
+
+        logger.info("Replicated %d rows", writer.total)
+        defer.returnValue(writer.finish())
+
+    def streams(self, writer, current_token, request_streams):
+        request_token = request_streams.get("streams")
 
         streams = []
 
@@ -195,9 +206,9 @@ class ReplicationResource(Resource):
                 )
 
     @defer.inlineCallbacks
-    def events(self, writer, current_token, limit):
-        request_events = parse_integer(writer.request, "events")
-        request_backfill = parse_integer(writer.request, "backfill")
+    def events(self, writer, current_token, limit, request_streams):
+        request_events = request_streams.get("events")
+        request_backfill = request_streams.get("backfill")
 
         if request_events is not None or request_backfill is not None:
             if request_events is None:
@@ -228,10 +239,10 @@ class ReplicationResource(Resource):
             )
 
     @defer.inlineCallbacks
-    def presence(self, writer, current_token):
+    def presence(self, writer, current_token, request_streams):
         current_position = current_token.presence
 
-        request_presence = parse_integer(writer.request, "presence")
+        request_presence = request_streams.get("presence")
 
         if request_presence is not None:
             presence_rows = yield self.presence_handler.get_all_presence_updates(
@@ -244,10 +255,10 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def typing(self, writer, current_token):
+    def typing(self, writer, current_token, request_streams):
         current_position = current_token.presence
 
-        request_typing = parse_integer(writer.request, "typing")
+        request_typing = request_streams.get("typing")
 
         if request_typing is not None:
             typing_rows = yield self.typing_handler.get_all_typing_updates(
@@ -258,10 +269,10 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def receipts(self, writer, current_token, limit):
+    def receipts(self, writer, current_token, limit, request_streams):
         current_position = current_token.receipts
 
-        request_receipts = parse_integer(writer.request, "receipts")
+        request_receipts = request_streams.get("receipts")
 
         if request_receipts is not None:
             receipts_rows = yield self.store.get_all_updated_receipts(
@@ -272,12 +283,12 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def account_data(self, writer, current_token, limit):
+    def account_data(self, writer, current_token, limit, request_streams):
         current_position = current_token.account_data
 
-        user_account_data = parse_integer(writer.request, "user_account_data")
-        room_account_data = parse_integer(writer.request, "room_account_data")
-        tag_account_data = parse_integer(writer.request, "tag_account_data")
+        user_account_data = request_streams.get("user_account_data")
+        room_account_data = request_streams.get("room_account_data")
+        tag_account_data = request_streams.get("tag_account_data")
 
         if user_account_data is not None or room_account_data is not None:
             if user_account_data is None:
@@ -303,10 +314,10 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def push_rules(self, writer, current_token, limit):
+    def push_rules(self, writer, current_token, limit, request_streams):
         current_position = current_token.push_rules
 
-        push_rules = parse_integer(writer.request, "push_rules")
+        push_rules = request_streams.get("push_rules")
 
         if push_rules is not None:
             rows = yield self.store.get_all_push_rule_updates(
@@ -318,10 +329,11 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def pushers(self, writer, current_token, limit):
+    def pushers(self, writer, current_token, limit, request_streams):
         current_position = current_token.pushers
 
-        pushers = parse_integer(writer.request, "pushers")
+        pushers = request_streams.get("pushers")
+
         if pushers is not None:
             updated, deleted = yield self.store.get_all_updated_pushers(
                 pushers, current_position, limit
@@ -336,10 +348,11 @@ class ReplicationResource(Resource):
             ))
 
     @defer.inlineCallbacks
-    def state(self, writer, current_token, limit):
+    def state(self, writer, current_token, limit, request_streams):
         current_position = current_token.state
 
-        state = parse_integer(writer.request, "state")
+        state = request_streams.get("state")
+
         if state is not None:
             state_groups, state_group_state = (
                 yield self.store.get_all_new_state_groups(
@@ -356,9 +369,8 @@ class ReplicationResource(Resource):
 
 class _Writer(object):
     """Writes the streams as a JSON object as the response to the request"""
-    def __init__(self, request):
+    def __init__(self):
         self.streams = {}
-        self.request = request
         self.total = 0
 
     def write_header_and_rows(self, name, rows, fields, position=None):
@@ -377,8 +389,7 @@ class _Writer(object):
         self.total += len(rows)
 
     def finish(self):
-        self.request.write(json.dumps(self.streams, ensure_ascii=False))
-        finish_request(self.request)
+        return self.streams
 
 
 class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
diff --git a/synapse/replication/slave/__init__.py b/synapse/replication/slave/__init__.py
new file mode 100644
index 0000000000..b7df13c9ee
--- /dev/null
+++ b/synapse/replication/slave/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/replication/slave/storage/__init__.py b/synapse/replication/slave/storage/__init__.py
new file mode 100644
index 0000000000..b7df13c9ee
--- /dev/null
+++ b/synapse/replication/slave/storage/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
new file mode 100644
index 0000000000..46e43ce1c7
--- /dev/null
+++ b/synapse/replication/slave/storage/_base.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage._base import SQLBaseStore
+from twisted.internet import defer
+
+
+class BaseSlavedStore(SQLBaseStore):
+    def __init__(self, db_conn, hs):
+        super(BaseSlavedStore, self).__init__(hs)
+
+    def stream_positions(self):
+        return {}
+
+    def process_replication(self, result):
+        return defer.succeed(None)
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
new file mode 100644
index 0000000000..24b5c79d4a
--- /dev/null
+++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.util.id_generators import _load_current_id
+
+
+class SlavedIdTracker(object):
+    def __init__(self, db_conn, table, column, extra_tables=[], step=1):
+        self.step = step
+        self._current = _load_current_id(db_conn, table, column, step)
+        for table, column in extra_tables:
+            self.advance(_load_current_id(db_conn, table, column))
+
+    def advance(self, new_id):
+        self._current = (max if self.step > 0 else min)(self._current, new_id)
+
+    def get_current_token(self):
+        return self._current
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
new file mode 100644
index 0000000000..680dc89536
--- /dev/null
+++ b/synapse/replication/slave/storage/events.py
@@ -0,0 +1,199 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.api.constants import EventTypes
+from synapse.events import FrozenEvent
+from synapse.storage import DataStore
+from synapse.storage.room import RoomStore
+from synapse.storage.roommember import RoomMemberStore
+from synapse.storage.event_federation import EventFederationStore
+from synapse.storage.state import StateStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+import ujson as json
+
+# So, um, we want to borrow a load of functions intended for reading from
+# a DataStore, but we don't want to take functions that either write to the
+# DataStore or are cached and don't have cache invalidation logic.
+#
+# Rather than write duplicate versions of those functions, or lift them to
+# a common base class, we going to grab the underlying __func__ object from
+# the method descriptor on the DataStore and chuck them into our class.
+
+
+class SlavedEventStore(BaseSlavedStore):
+
+    def __init__(self, db_conn, hs):
+        super(SlavedEventStore, self).__init__(db_conn, hs)
+        self._stream_id_gen = SlavedIdTracker(
+            db_conn, "events", "stream_ordering",
+        )
+        self._backfill_id_gen = SlavedIdTracker(
+            db_conn, "events", "stream_ordering", step=-1
+        )
+        events_max = self._stream_id_gen.get_current_token()
+        event_cache_prefill, min_event_val = self._get_cache_dict(
+            db_conn, "events",
+            entity_column="room_id",
+            stream_column="stream_ordering",
+            max_value=events_max,
+        )
+        self._events_stream_cache = StreamChangeCache(
+            "EventsRoomStreamChangeCache", min_event_val,
+            prefilled_cache=event_cache_prefill,
+        )
+
+    # Cached functions can't be accessed through a class instance so we need
+    # to reach inside the __dict__ to extract them.
+    get_room_name_and_aliases = RoomStore.__dict__["get_room_name_and_aliases"]
+    get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
+    get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
+    get_latest_event_ids_in_room = EventFederationStore.__dict__[
+        "get_latest_event_ids_in_room"
+    ]
+    _get_current_state_for_key = StateStore.__dict__[
+        "_get_current_state_for_key"
+    ]
+
+    get_current_state = DataStore.get_current_state.__func__
+    get_current_state_for_key = DataStore.get_current_state_for_key.__func__
+    get_rooms_for_user_where_membership_is = (
+        DataStore.get_rooms_for_user_where_membership_is.__func__
+    )
+    get_membership_changes_for_user = (
+        DataStore.get_membership_changes_for_user.__func__
+    )
+    get_room_events_max_id = DataStore.get_room_events_max_id.__func__
+    get_room_events_stream_for_room = (
+        DataStore.get_room_events_stream_for_room.__func__
+    )
+    _set_before_and_after = DataStore._set_before_and_after
+
+    _get_events = DataStore._get_events.__func__
+    _get_events_from_cache = DataStore._get_events_from_cache.__func__
+
+    _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
+    _parse_events_txn = DataStore._parse_events_txn.__func__
+    _get_events_txn = DataStore._get_events_txn.__func__
+    _fetch_events_txn = DataStore._fetch_events_txn.__func__
+    _fetch_event_rows = DataStore._fetch_event_rows.__func__
+    _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
+    _get_rooms_for_user_where_membership_is_txn = (
+        DataStore._get_rooms_for_user_where_membership_is_txn.__func__
+    )
+    _get_members_rows_txn = DataStore._get_members_rows_txn.__func__
+
+    def stream_positions(self):
+        result = super(SlavedEventStore, self).stream_positions()
+        result["events"] = self._stream_id_gen.get_current_token()
+        result["backfilled"] = self._backfill_id_gen.get_current_token()
+        return result
+
+    def process_replication(self, result):
+        state_resets = set(
+            r[0] for r in result.get("state_resets", {"rows": []})["rows"]
+        )
+
+        stream = result.get("events")
+        if stream:
+            self._stream_id_gen.advance(stream["position"])
+            for row in stream["rows"]:
+                self._process_replication_row(
+                    row, backfilled=False, state_resets=state_resets
+                )
+
+        stream = result.get("backfill")
+        if stream:
+            self._backfill_id_gen.advance(stream["position"])
+            for row in stream["rows"]:
+                self._process_replication_row(
+                    row, backfilled=True, state_resets=state_resets
+                )
+
+        stream = result.get("forward_ex_outliers")
+        if stream:
+            for row in stream["rows"]:
+                event_id = row[1]
+                self._invalidate_get_event_cache(event_id)
+
+        stream = result.get("backward_ex_outliers")
+        if stream:
+            for row in stream["rows"]:
+                event_id = row[1]
+                self._invalidate_get_event_cache(event_id)
+
+        return super(SlavedEventStore, self).process_replication(result)
+
+    def _process_replication_row(self, row, backfilled, state_resets):
+        position = row[0]
+        internal = json.loads(row[1])
+        event_json = json.loads(row[2])
+
+        event = FrozenEvent(event_json, internal_metadata_dict=internal)
+        self._invalidate_caches_for_event(
+            event, backfilled, reset_state=position in state_resets
+        )
+
+    def _invalidate_caches_for_event(self, event, backfilled, reset_state):
+        if reset_state:
+            self._get_current_state_for_key.invalidate_all()
+            self.get_rooms_for_user.invalidate_all()
+            self.get_users_in_room.invalidate((event.room_id,))
+            # self.get_joined_hosts_for_room.invalidate((event.room_id,))
+            self.get_room_name_and_aliases.invalidate((event.room_id,))
+
+        self._invalidate_get_event_cache(event.event_id)
+
+        if not backfilled:
+            self._events_stream_cache.entity_has_changed(
+                event.room_id, event.internal_metadata.stream_ordering
+            )
+
+        # self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
+        #     (event.room_id,)
+        # )
+
+        if event.type == EventTypes.Redaction:
+            self._invalidate_get_event_cache(event.redacts)
+
+        if event.type == EventTypes.Member:
+            self.get_rooms_for_user.invalidate((event.state_key,))
+            # self.get_joined_hosts_for_room.invalidate((event.room_id,))
+            self.get_users_in_room.invalidate((event.room_id,))
+            # self._membership_stream_cache.entity_has_changed(
+            #    event.state_key, event.internal_metadata.stream_ordering
+            # )
+
+        if not event.is_state():
+            return
+
+        if backfilled:
+            return
+
+        if (not event.internal_metadata.is_invite_from_remote()
+                and event.internal_metadata.is_outlier()):
+            return
+
+        self._get_current_state_for_key.invalidate((
+            event.room_id, event.type, event.state_key
+        ))
+
+        if event.type in [EventTypes.Name, EventTypes.Aliases]:
+            self.get_room_name_and_aliases.invalidate(
+                (event.room_id,)
+            )
+            pass