diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index b378b41646..1d7a607529 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -13,10 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import send_event
-
from synapse.http.server import JsonResource
+from synapse.replication.http import membership, send_event
REPLICATION_PREFIX = "/_synapse/replication"
@@ -29,3 +27,4 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
+ membership.register_servlets(hs, self)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
new file mode 100644
index 0000000000..e66c4e881f
--- /dev/null
+++ b/synapse/replication/http/membership.py
@@ -0,0 +1,334 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError, MatrixCodeMessageException
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.types import Requester, UserID
+from synapse.util.distributor import user_left_room, user_joined_room
+
+logger = logging.getLogger(__name__)
+
+
+@defer.inlineCallbacks
+def remote_join(client, host, port, requester, remote_room_hosts,
+ room_id, user_id, content):
+ """Ask the master to do a remote join for the given user to the given room
+
+ Args:
+ client (SimpleHttpClient)
+ host (str): host of master
+ port (int): port on master listening for HTTP replication
+ requester (Requester)
+ remote_room_hosts (list[str]): Servers to try and join via
+ room_id (str)
+ user_id (str)
+ content (dict): The event content to use for the join event
+
+ Returns:
+ Deferred
+ """
+ uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
+
+ payload = {
+ "requester": requester.serialize(),
+ "remote_room_hosts": remote_room_hosts,
+ "room_id": room_id,
+ "user_id": user_id,
+ "content": content,
+ }
+
+ try:
+ result = yield client.post_json_get_json(uri, payload)
+ except MatrixCodeMessageException as e:
+ # We convert to SynapseError as we know that it was a SynapseError
+ # on the master process that we should send to the client. (And
+ # importantly, not stack traces everywhere)
+ raise SynapseError(e.code, e.msg, e.errcode)
+ defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def remote_reject_invite(client, host, port, requester, remote_room_hosts,
+ room_id, user_id):
+ """Ask master to reject the invite for the user and room.
+
+ Args:
+ client (SimpleHttpClient)
+ host (str): host of master
+ port (int): port on master listening for HTTP replication
+ requester (Requester)
+ remote_room_hosts (list[str]): Servers to try and reject via
+ room_id (str)
+ user_id (str)
+
+ Returns:
+ Deferred
+ """
+ uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
+
+ payload = {
+ "requester": requester.serialize(),
+ "remote_room_hosts": remote_room_hosts,
+ "room_id": room_id,
+ "user_id": user_id,
+ }
+
+ try:
+ result = yield client.post_json_get_json(uri, payload)
+ except MatrixCodeMessageException as e:
+ # We convert to SynapseError as we know that it was a SynapseError
+ # on the master process that we should send to the client. (And
+ # importantly, not stack traces everywhere)
+ raise SynapseError(e.code, e.msg, e.errcode)
+ defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def get_or_register_3pid_guest(client, host, port, requester,
+ medium, address, inviter_user_id):
+ """Ask the master to get/create a guest account for given 3PID.
+
+ Args:
+ client (SimpleHttpClient)
+ host (str): host of master
+ port (int): port on master listening for HTTP replication
+ requester (Requester)
+ medium (str)
+ address (str)
+ inviter_user_id (str): The user ID who is trying to invite the
+ 3PID
+
+ Returns:
+ Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+ 3PID guest account.
+ """
+
+ uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port)
+
+ payload = {
+ "requester": requester.serialize(),
+ "medium": medium,
+ "address": address,
+ "inviter_user_id": inviter_user_id,
+ }
+
+ try:
+ result = yield client.post_json_get_json(uri, payload)
+ except MatrixCodeMessageException as e:
+ # We convert to SynapseError as we know that it was a SynapseError
+ # on the master process that we should send to the client. (And
+ # importantly, not stack traces everywhere)
+ raise SynapseError(e.code, e.msg, e.errcode)
+ defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def notify_user_membership_change(client, host, port, user_id, room_id, change):
+ """Notify master that a user has joined or left the room
+
+ Args:
+ client (SimpleHttpClient)
+ host (str): host of master
+ port (int): port on master listening for HTTP replication.
+ user_id (str)
+ room_id (str)
+ change (str): Either "join" or "left"
+
+ Returns:
+ Deferred
+ """
+ assert change in ("joined", "left")
+
+ uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
+
+ payload = {
+ "user_id": user_id,
+ "room_id": room_id,
+ }
+
+ try:
+ result = yield client.post_json_get_json(uri, payload)
+ except MatrixCodeMessageException as e:
+ # We convert to SynapseError as we know that it was a SynapseError
+ # on the master process that we should send to the client. (And
+ # importantly, not stack traces everywhere)
+ raise SynapseError(e.code, e.msg, e.errcode)
+ defer.returnValue(result)
+
+
+class ReplicationRemoteJoinRestServlet(RestServlet):
+ PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
+
+ def __init__(self, hs):
+ super(ReplicationRemoteJoinRestServlet, self).__init__()
+
+ self.federation_handler = hs.get_handlers().federation_handler
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ content = parse_json_object_from_request(request)
+
+ remote_room_hosts = content["remote_room_hosts"]
+ room_id = content["room_id"]
+ user_id = content["user_id"]
+ event_content = content["content"]
+
+ requester = Requester.deserialize(self.store, content["requester"])
+
+ if requester.user:
+ request.authenticated_entity = requester.user.to_string()
+
+ logger.info(
+ "remote_join: %s into room: %s",
+ user_id, room_id,
+ )
+
+ yield self.federation_handler.do_invite_join(
+ remote_room_hosts,
+ room_id,
+ user_id,
+ event_content,
+ )
+
+ defer.returnValue((200, {}))
+
+
+class ReplicationRemoteRejectInviteRestServlet(RestServlet):
+ PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")]
+
+ def __init__(self, hs):
+ super(ReplicationRemoteRejectInviteRestServlet, self).__init__()
+
+ self.federation_handler = hs.get_handlers().federation_handler
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ content = parse_json_object_from_request(request)
+
+ remote_room_hosts = content["remote_room_hosts"]
+ room_id = content["room_id"]
+ user_id = content["user_id"]
+
+ requester = Requester.deserialize(self.store, content["requester"])
+
+ if requester.user:
+ request.authenticated_entity = requester.user.to_string()
+
+ logger.info(
+ "remote_reject_invite: %s out of room: %s",
+ user_id, room_id,
+ )
+
+ try:
+ event = yield self.federation_handler.do_remotely_reject_invite(
+ remote_room_hosts,
+ room_id,
+ user_id,
+ )
+ ret = event.get_pdu_json()
+ except Exception as e:
+ # if we were unable to reject the exception, just mark
+ # it as rejected on our end and plough ahead.
+ #
+ # The 'except' clause is very broad, but we need to
+ # capture everything from DNS failures upwards
+ #
+ logger.warn("Failed to reject invite: %s", e)
+
+ yield self.store.locally_reject_invite(
+ user_id, room_id
+ )
+ ret = {}
+
+ defer.returnValue((200, ret))
+
+
+class ReplicationRegister3PIDGuestRestServlet(RestServlet):
+ PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")]
+
+ def __init__(self, hs):
+ super(ReplicationRegister3PIDGuestRestServlet, self).__init__()
+
+ self.registeration_handler = hs.get_handlers().registration_handler
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ content = parse_json_object_from_request(request)
+
+ medium = content["medium"]
+ address = content["address"]
+ inviter_user_id = content["inviter_user_id"]
+
+ requester = Requester.deserialize(self.store, content["requester"])
+
+ if requester.user:
+ request.authenticated_entity = requester.user.to_string()
+
+ logger.info("get_or_register_3pid_guest: %r", content)
+
+ ret = yield self.registeration_handler.get_or_register_3pid_guest(
+ medium, address, inviter_user_id,
+ )
+
+ defer.returnValue((200, ret))
+
+
+class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
+ PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")]
+
+ def __init__(self, hs):
+ super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__()
+
+ self.registeration_handler = hs.get_handlers().registration_handler
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+ self.distributor = hs.get_distributor()
+
+ def on_POST(self, request, change):
+ content = parse_json_object_from_request(request)
+
+ user_id = content["user_id"]
+ room_id = content["room_id"]
+
+ logger.info("user membership change: %s in %s", user_id, room_id)
+
+ user = UserID.from_string(user_id)
+
+ if change == "joined":
+ user_joined_room(self.distributor, user, room_id)
+ elif change == "left":
+ user_left_room(self.distributor, user, room_id)
+ else:
+ raise Exception("Unrecognized change: %r", change)
+
+ return (200, {})
+
+
+def register_servlets(hs, http_server):
+ ReplicationRemoteJoinRestServlet(hs).register(http_server)
+ ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
+ ReplicationRegister3PIDGuestRestServlet(hs).register(http_server)
+ ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 468f4b68f4..bbe2f967b7 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -15,12 +15,17 @@
from twisted.internet import defer
-from synapse.api.errors import SynapseError, MatrixCodeMessageException
+from synapse.api.errors import (
+ SynapseError, MatrixCodeMessageException, CodeMessageException,
+)
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.util.async import sleep
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure
-from synapse.types import Requester
+from synapse.types import Requester, UserID
import logging
import re
@@ -29,7 +34,8 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
-def send_event_to_master(client, host, port, requester, event, context):
+def send_event_to_master(client, host, port, requester, event, context,
+ ratelimit, extra_users):
"""Send event to be handled on the master
Args:
@@ -39,8 +45,12 @@ def send_event_to_master(client, host, port, requester, event, context):
requester (Requester)
event (FrozenEvent)
context (EventContext)
+ ratelimit (bool)
+ extra_users (list(UserID)): Any extra users to notify about event
"""
- uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,)
+ uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
+ host, port, event.event_id,
+ )
payload = {
"event": event.get_pdu_json(),
@@ -48,10 +58,27 @@ def send_event_to_master(client, host, port, requester, event, context):
"rejected_reason": event.rejected_reason,
"context": context.serialize(event),
"requester": requester.serialize(),
+ "ratelimit": ratelimit,
+ "extra_users": [u.to_string() for u in extra_users],
}
try:
- result = yield client.post_json_get_json(uri, payload)
+ # We keep retrying the same request for timeouts. This is so that we
+ # have a good idea that the request has either succeeded or failed on
+ # the master, and so whether we should clean up or not.
+ while True:
+ try:
+ result = yield client.put_json(uri, payload)
+ break
+ except CodeMessageException as e:
+ if e.code != 504:
+ raise
+
+ logger.warn("send_event request timed out")
+
+ # If we timed out we probably don't need to worry about backing
+ # off too much, but lets just wait a little anyway.
+ yield sleep(1)
except MatrixCodeMessageException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
@@ -66,7 +93,7 @@ class ReplicationSendEventRestServlet(RestServlet):
The API looks like:
- POST /_synapse/replication/send_event
+ POST /_synapse/replication/send_event/:event_id
{
"event": { .. serialized event .. },
@@ -74,9 +101,11 @@ class ReplicationSendEventRestServlet(RestServlet):
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
"requester": { .. serialized requester .. },
+ "ratelimit": true,
+ "extra_users": [],
}
"""
- PATTERNS = [re.compile("^/_synapse/replication/send_event$")]
+ PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")]
def __init__(self, hs):
super(ReplicationSendEventRestServlet, self).__init__()
@@ -85,8 +114,23 @@ class ReplicationSendEventRestServlet(RestServlet):
self.store = hs.get_datastore()
self.clock = hs.get_clock()
+ # The responses are tiny, so we may as well cache them for a while
+ self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
+
+ def on_PUT(self, request, event_id):
+ result = self.response_cache.get(event_id)
+ if not result:
+ result = self.response_cache.set(
+ event_id,
+ self._handle_request(request)
+ )
+ else:
+ logger.warn("Returning cached response")
+ return make_deferred_yieldable(result)
+
+ @preserve_fn
@defer.inlineCallbacks
- def on_POST(self, request):
+ def _handle_request(self, request):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)
@@ -98,6 +142,9 @@ class ReplicationSendEventRestServlet(RestServlet):
requester = Requester.deserialize(self.store, content["requester"])
context = yield EventContext.deserialize(self.store, content["context"])
+ ratelimit = content["ratelimit"]
+ extra_users = [UserID.from_string(u) for u in content["extra_users"]]
+
if requester.user:
request.authenticated_entity = requester.user.to_string()
@@ -106,8 +153,10 @@ class ReplicationSendEventRestServlet(RestServlet):
event.event_id, event.room_id,
)
- yield self.event_creation_handler.handle_new_client_event(
+ yield self.event_creation_handler.persist_and_notify_client_event(
requester, event, context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
)
defer.returnValue((200, {}))
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index efbd87918e..d9ba6d69b1 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,50 +14,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
-from synapse.storage.account_data import AccountDataStore
-from synapse.storage.tags import TagsStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.storage.account_data import AccountDataWorkerStore
+from synapse.storage.tags import TagsWorkerStore
-class SlavedAccountDataStore(BaseSlavedStore):
+class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
- super(SlavedAccountDataStore, self).__init__(db_conn, hs)
self._account_data_id_gen = SlavedIdTracker(
db_conn, "account_data_max_stream_id", "stream_id",
)
- self._account_data_stream_cache = StreamChangeCache(
- "AccountDataAndTagsChangeCache",
- self._account_data_id_gen.get_current_token(),
- )
-
- get_account_data_for_user = (
- AccountDataStore.__dict__["get_account_data_for_user"]
- )
-
- get_global_account_data_by_type_for_users = (
- AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
- )
- get_global_account_data_by_type_for_user = (
- AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
- )
-
- get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
- get_tags_for_room = (
- DataStore.get_tags_for_room.__func__
- )
- get_account_data_for_room = (
- DataStore.get_account_data_for_room.__func__
- )
-
- get_updated_tags = DataStore.get_updated_tags.__func__
- get_updated_account_data_for_user = (
- DataStore.get_updated_account_data_for_user.__func__
- )
+ super(SlavedAccountDataStore, self).__init__(db_conn, hs)
def get_max_account_data_stream_id(self):
return self._account_data_id_gen.get_current_token()
@@ -85,6 +56,10 @@ class SlavedAccountDataStore(BaseSlavedStore):
(row.data_type, row.user_id,)
)
self.get_account_data_for_user.invalidate((row.user_id,))
+ self.get_account_data_for_room.invalidate((row.user_id, row.room_id,))
+ self.get_account_data_for_room_and_type.invalidate(
+ (row.user_id, row.room_id, row.data_type,),
+ )
self._account_data_stream_cache.entity_has_changed(
row.user_id, token
)
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 0d3f31a50c..8cae3076f4 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,33 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.config.appservice import load_appservices
-from synapse.storage.appservice import _make_exclusive_regex
+from synapse.storage.appservice import (
+ ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
+)
-class SlavedApplicationServiceStore(BaseSlavedStore):
- def __init__(self, db_conn, hs):
- super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
- self.services_cache = load_appservices(
- hs.config.server_name,
- hs.config.app_service_config_files
- )
- self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
-
- get_app_service_by_token = DataStore.get_app_service_by_token.__func__
- get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
- get_app_services = DataStore.get_app_services.__func__
- get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__
- create_appservice_txn = DataStore.create_appservice_txn.__func__
- get_appservices_by_state = DataStore.get_appservices_by_state.__func__
- get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__
- _get_last_txn = DataStore._get_last_txn.__func__
- complete_appservice_txn = DataStore.complete_appservice_txn.__func__
- get_appservice_state = DataStore.get_appservice_state.__func__
- set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__
- set_appservice_state = DataStore.set_appservice_state.__func__
- get_if_app_services_interested_in_user = (
- DataStore.get_if_app_services_interested_in_user.__func__
- )
+class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
+ ApplicationServiceWorkerStore):
+ pass
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 7301d885f2..6deecd3963 100644
--- a/synapse/replication/slave/storage/directory.py
+++ b/synapse/replication/slave/storage/directory.py
@@ -14,10 +14,8 @@
# limitations under the License.
from ._base import BaseSlavedStore
-from synapse.storage.directory import DirectoryStore
+from synapse.storage.directory import DirectoryWorkerStore
-class DirectoryStore(BaseSlavedStore):
- get_aliases_for_room = DirectoryStore.__dict__[
- "get_aliases_for_room"
- ]
+class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
+ pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index f8c164b48b..b1f64ef0d8 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,14 +16,13 @@
import logging
from synapse.api.constants import EventTypes
-from synapse.storage import DataStore
-from synapse.storage.event_federation import EventFederationStore
-from synapse.storage.event_push_actions import EventPushActionsStore
-from synapse.storage.roommember import RoomMemberStore
+from synapse.storage.event_federation import EventFederationWorkerStore
+from synapse.storage.event_push_actions import EventPushActionsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
-from synapse.storage.stream import StreamStore
-from synapse.storage.signatures import SignatureStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.stream import StreamWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@@ -38,157 +38,33 @@ logger = logging.getLogger(__name__)
# the method descriptor on the DataStore and chuck them into our class.
-class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
+class SlavedEventStore(EventFederationWorkerStore,
+ RoomMemberWorkerStore,
+ EventPushActionsWorkerStore,
+ StreamWorkerStore,
+ EventsWorkerStore,
+ StateGroupWorkerStore,
+ SignatureWorkerStore,
+ BaseSlavedStore):
def __init__(self, db_conn, hs):
- super(SlavedEventStore, self).__init__(db_conn, hs)
self._stream_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering",
)
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
- events_max = self._stream_id_gen.get_current_token()
- event_cache_prefill, min_event_val = self._get_cache_dict(
- db_conn, "events",
- entity_column="room_id",
- stream_column="stream_ordering",
- max_value=events_max,
- )
- self._events_stream_cache = StreamChangeCache(
- "EventsRoomStreamChangeCache", min_event_val,
- prefilled_cache=event_cache_prefill,
- )
- self._membership_stream_cache = StreamChangeCache(
- "MembershipStreamChangeCache", events_max,
- )
- self.stream_ordering_month_ago = 0
- self._stream_order_on_start = self.get_room_max_stream_ordering()
+ super(SlavedEventStore, self).__init__(db_conn, hs)
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
- get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
- get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
- get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"]
- get_users_who_share_room_with_user = (
- RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
- )
- get_latest_event_ids_in_room = EventFederationStore.__dict__[
- "get_latest_event_ids_in_room"
- ]
- get_invited_rooms_for_user = RoomMemberStore.__dict__[
- "get_invited_rooms_for_user"
- ]
- get_unread_event_push_actions_by_room_for_user = (
- EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
- )
- _get_unread_counts_by_receipt_txn = (
- DataStore._get_unread_counts_by_receipt_txn.__func__
- )
- _get_unread_counts_by_pos_txn = (
- DataStore._get_unread_counts_by_pos_txn.__func__
- )
- get_recent_event_ids_for_room = (
- StreamStore.__dict__["get_recent_event_ids_for_room"]
- )
- _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
- has_room_changed_since = DataStore.has_room_changed_since.__func__
-
- get_unread_push_actions_for_user_in_range_for_http = (
- DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
- )
- get_unread_push_actions_for_user_in_range_for_email = (
- DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
- )
- get_push_action_users_in_range = (
- DataStore.get_push_action_users_in_range.__func__
- )
- get_event = DataStore.get_event.__func__
- get_events = DataStore.get_events.__func__
- get_rooms_for_user_where_membership_is = (
- DataStore.get_rooms_for_user_where_membership_is.__func__
- )
- get_membership_changes_for_user = (
- DataStore.get_membership_changes_for_user.__func__
- )
- get_room_events_max_id = DataStore.get_room_events_max_id.__func__
- get_room_events_stream_for_room = (
- DataStore.get_room_events_stream_for_room.__func__
- )
- get_events_around = DataStore.get_events_around.__func__
- get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__
- get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__
- _get_joined_users_from_context = (
- RoomMemberStore.__dict__["_get_joined_users_from_context"]
- )
-
- get_joined_hosts = DataStore.get_joined_hosts.__func__
- _get_joined_hosts = RoomMemberStore.__dict__["_get_joined_hosts"]
-
- get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
- get_room_events_stream_for_rooms = (
- DataStore.get_room_events_stream_for_rooms.__func__
- )
- is_host_joined = RoomMemberStore.__dict__["is_host_joined"]
- get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
-
- _set_before_and_after = staticmethod(DataStore._set_before_and_after)
-
- _get_events = DataStore._get_events.__func__
- _get_events_from_cache = DataStore._get_events_from_cache.__func__
-
- _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
- _enqueue_events = DataStore._enqueue_events.__func__
- _do_fetch = DataStore._do_fetch.__func__
- _fetch_event_rows = DataStore._fetch_event_rows.__func__
- _get_event_from_row = DataStore._get_event_from_row.__func__
- _get_rooms_for_user_where_membership_is_txn = (
- DataStore._get_rooms_for_user_where_membership_is_txn.__func__
- )
- _get_events_around_txn = DataStore._get_events_around_txn.__func__
-
- get_backfill_events = DataStore.get_backfill_events.__func__
- _get_backfill_events = DataStore._get_backfill_events.__func__
- get_missing_events = DataStore.get_missing_events.__func__
- _get_missing_events = DataStore._get_missing_events.__func__
-
- get_auth_chain = DataStore.get_auth_chain.__func__
- get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
- _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
-
- get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
-
- get_forward_extremeties_for_room = (
- DataStore.get_forward_extremeties_for_room.__func__
- )
- _get_forward_extremeties_for_room = (
- EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
- )
-
- get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
-
- get_federation_out_pos = DataStore.get_federation_out_pos.__func__
- update_federation_out_pos = DataStore.update_federation_out_pos.__func__
-
- get_latest_event_ids_and_hashes_in_room = (
- DataStore.get_latest_event_ids_and_hashes_in_room.__func__
- )
- _get_latest_event_ids_and_hashes_in_room = (
- DataStore._get_latest_event_ids_and_hashes_in_room.__func__
- )
- _get_event_reference_hashes_txn = (
- DataStore._get_event_reference_hashes_txn.__func__
- )
- add_event_hashes = (
- DataStore.add_event_hashes.__func__
- )
- get_event_reference_hashes = (
- SignatureStore.__dict__["get_event_reference_hashes"]
- )
- get_event_reference_hash = (
- SignatureStore.__dict__["get_event_reference_hash"]
- )
+
+ def get_room_max_stream_ordering(self):
+ return self._stream_id_gen.get_current_token()
+
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py
new file mode 100644
index 0000000000..46c28d4171
--- /dev/null
+++ b/synapse/replication/slave/storage/profile.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.storage.profile import ProfileWorkerStore
+
+
+class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
+ pass
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 83e880fdd2..bb2c40b6e3 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,29 +16,15 @@
from .events import SlavedEventStore
from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
-from synapse.storage.push_rule import PushRuleStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.push_rule import PushRulesWorkerStore
-class SlavedPushRuleStore(SlavedEventStore):
+class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore):
def __init__(self, db_conn, hs):
- super(SlavedPushRuleStore, self).__init__(db_conn, hs)
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id",
)
- self.push_rules_stream_cache = StreamChangeCache(
- "PushRulesStreamChangeCache",
- self._push_rules_stream_id_gen.get_current_token(),
- )
-
- get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
- get_push_rules_enabled_for_user = (
- PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
- )
- have_push_rules_changed_for_user = (
- DataStore.have_push_rules_changed_for_user.__func__
- )
+ super(SlavedPushRuleStore, self).__init__(db_conn, hs)
def get_push_rules_stream_token(self):
return (
@@ -45,6 +32,9 @@ class SlavedPushRuleStore(SlavedEventStore):
self._stream_id_gen.get_current_token(),
)
+ def get_max_push_rules_stream_id(self):
+ return self._push_rules_stream_id_gen.get_current_token()
+
def stream_positions(self):
result = super(SlavedPushRuleStore, self).stream_positions()
result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 4e8d68ece9..a7cd5a7291 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +17,10 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
+from synapse.storage.pusher import PusherWorkerStore
-class SlavedPusherStore(BaseSlavedStore):
+class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedPusherStore, self).__init__(db_conn, hs)
@@ -28,13 +29,6 @@ class SlavedPusherStore(BaseSlavedStore):
extra_tables=[("deleted_pushers", "stream_id")],
)
- get_all_pushers = DataStore.get_all_pushers.__func__
- get_pushers_by = DataStore.get_pushers_by.__func__
- get_pushers_by_app_id_and_pushkey = (
- DataStore.get_pushers_by_app_id_and_pushkey.__func__
- )
- _decode_pushers_rows = DataStore._decode_pushers_rows.__func__
-
def stream_positions(self):
result = super(SlavedPusherStore, self).stream_positions()
result["pushers"] = self._pushers_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index b371574ece..1647072f65 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,9 +17,7 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
-from synapse.storage.receipts import ReceiptsStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.receipts import ReceiptsWorkerStore
# So, um, we want to borrow a load of functions intended for reading from
# a DataStore, but we don't want to take functions that either write to the
@@ -29,36 +28,19 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
# the method descriptor on the DataStore and chuck them into our class.
-class SlavedReceiptsStore(BaseSlavedStore):
+class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
- super(SlavedReceiptsStore, self).__init__(db_conn, hs)
-
+ # We instantiate this first as the ReceiptsWorkerStore constructor
+ # needs to be able to call get_max_receipt_stream_id
self._receipts_id_gen = SlavedIdTracker(
db_conn, "receipts_linearized", "stream_id"
)
- self._receipts_stream_cache = StreamChangeCache(
- "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
- )
-
- get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
- get_linearized_receipts_for_room = (
- ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
- )
- _get_linearized_receipts_for_rooms = (
- ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
- )
- get_last_receipt_event_id_for_user = (
- ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
- )
-
- get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
- get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
+ super(SlavedReceiptsStore, self).__init__(db_conn, hs)
- get_linearized_receipts_for_rooms = (
- DataStore.get_linearized_receipts_for_rooms.__func__
- )
+ def get_max_receipt_stream_id(self):
+ return self._receipts_id_gen.get_current_token()
def stream_positions(self):
result = super(SlavedReceiptsStore, self).stream_positions()
@@ -71,6 +53,8 @@ class SlavedReceiptsStore(BaseSlavedStore):
self.get_last_receipt_event_id_for_user.invalidate(
(user_id, room_id, receipt_type)
)
+ self._invalidate_get_users_with_receipts_in_room(room_id, receipt_type, user_id)
+ self.get_receipts_for_room.invalidate((room_id, receipt_type))
def process_replication_rows(self, stream_name, token, rows):
if stream_name == "receipts":
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
index e27c7332d2..7323bf0f1e 100644
--- a/synapse/replication/slave/storage/registration.py
+++ b/synapse/replication/slave/storage/registration.py
@@ -14,20 +14,8 @@
# limitations under the License.
from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.registration import RegistrationStore
+from synapse.storage.registration import RegistrationWorkerStore
-class SlavedRegistrationStore(BaseSlavedStore):
- def __init__(self, db_conn, hs):
- super(SlavedRegistrationStore, self).__init__(db_conn, hs)
-
- # TODO: use the cached version and invalidate deleted tokens
- get_user_by_access_token = RegistrationStore.__dict__[
- "get_user_by_access_token"
- ]
-
- _query_for_auth = DataStore._query_for_auth.__func__
- get_user_by_id = RegistrationStore.__dict__[
- "get_user_by_id"
- ]
+class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
+ pass
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index f510384033..5ae1670157 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -14,32 +14,19 @@
# limitations under the License.
from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.room import RoomStore
+from synapse.storage.room import RoomWorkerStore
from ._slaved_id_tracker import SlavedIdTracker
-class RoomStore(BaseSlavedStore):
+class RoomStore(RoomWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(RoomStore, self).__init__(db_conn, hs)
self._public_room_id_gen = SlavedIdTracker(
db_conn, "public_room_list_stream", "stream_id"
)
- get_public_room_ids = DataStore.get_public_room_ids.__func__
- get_current_public_room_stream_id = (
- DataStore.get_current_public_room_stream_id.__func__
- )
- get_public_room_ids_at_stream_id = (
- RoomStore.__dict__["get_public_room_ids_at_stream_id"]
- )
- get_public_room_ids_at_stream_id_txn = (
- DataStore.get_public_room_ids_at_stream_id_txn.__func__
- )
- get_published_at_stream_id_txn = (
- DataStore.get_published_at_stream_id_txn.__func__
- )
- get_public_room_changes = DataStore.get_public_room_changes.__func__
+ def get_current_public_room_stream_id(self):
+ return self._public_room_id_gen.get_current_token()
def stream_positions(self):
result = super(RoomStore, self).stream_positions()
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 171227cce2..12aac3cc6b 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -19,11 +19,13 @@ allowed to be sent by which side.
"""
import logging
-import ujson as json
+import simplejson
logger = logging.getLogger(__name__)
+_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
+
class Command(object):
"""The base command class.
@@ -100,14 +102,14 @@ class RdataCommand(Command):
return cls(
stream_name,
None if token == "batch" else int(token),
- json.loads(row_json)
+ simplejson.loads(row_json)
)
def to_line(self):
return " ".join((
self.stream_name,
str(self.token) if self.token is not None else "batch",
- json.dumps(self.row),
+ _json_encoder.encode(self.row),
))
@@ -298,10 +300,12 @@ class InvalidateCacheCommand(Command):
def from_line(cls, line):
cache_func, keys_json = line.split(" ", 1)
- return cls(cache_func, json.loads(keys_json))
+ return cls(cache_func, simplejson.loads(keys_json))
def to_line(self):
- return " ".join((self.cache_func, json.dumps(self.keys)))
+ return " ".join((
+ self.cache_func, _json_encoder.encode(self.keys),
+ ))
class UserIpCommand(Command):
@@ -325,14 +329,14 @@ class UserIpCommand(Command):
def from_line(cls, line):
user_id, jsn = line.split(" ", 1)
- access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
+ access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
return cls(
user_id, access_token, ip, user_agent, device_id, last_seen
)
def to_line(self):
- return self.user_id + " " + json.dumps((
+ return self.user_id + " " + _json_encoder.encode((
self.access_token, self.ip, self.user_agent, self.device_id,
self.last_seen,
))
|