diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 6b77aec832..ab79a45646 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
@@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
- TransactionStore,
+ SlavedTransactionStore,
SlavedClientIpStore,
BaseSlavedStore,
):
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index a385793dd4..03d39968a8 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
@@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
DirectoryStore,
- TransactionStore,
+ SlavedTransactionStore,
SlavedProfileStore,
SlavedAccountDataStore,
SlavedPusherStore,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 57d96c13a2..52522e9d33 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -32,11 +32,16 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -49,11 +54,16 @@ logger = logging.getLogger("synapse.app.federation_reader")
class FederationReaderSlavedStore(
+ SlavedProfileStore,
+ SlavedApplicationServiceStore,
+ SlavedPusherStore,
+ SlavedPushRuleStore,
+ SlavedReceiptsStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
- TransactionStore,
+ SlavedTransactionStore,
BaseSlavedStore,
):
pass
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 7bbf0ad082..7a4310ca18 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -36,7 +36,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
class FederationSenderSlaveStore(
- SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+ SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 1423056732..fd1f6cbf7e 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
@@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
- TransactionStore,
+ SlavedTransactionStore,
BaseSlavedStore,
MediaRepositoryStore,
):
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a23136784a..3e0cd294a1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -39,6 +39,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+ ReplicationFederationSendEduRestServlet,
+ ReplicationGetQueryRestServlet,
+)
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -760,6 +764,8 @@ class FederationHandlerRegistry(object):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))
+ logger.info("Registering federation EDU handler for %r", edu_type)
+
self.edu_handlers[edu_type] = handler
def register_query_handler(self, query_type, handler):
@@ -778,6 +784,8 @@ class FederationHandlerRegistry(object):
"Already have a Query handler for %s" % (query_type,)
)
+ logger.info("Registering federation query handler for %r", query_type)
+
self.query_handlers[query_type] = handler
@defer.inlineCallbacks
@@ -800,3 +808,49 @@ class FederationHandlerRegistry(object):
raise NotFoundError("No handler for Query type '%s'" % (query_type,))
return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+ """A FederationHandlerRegistry for worker processes.
+
+ When receiving EDU or queries it will check if an appropriate handler has
+ been registered on the worker, if there isn't one then it calls off to the
+ master process.
+ """
+
+ def __init__(self, hs):
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+ self.clock = hs.get_clock()
+
+ self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+ self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+ super(ReplicationFederationHandlerRegistry, self).__init__()
+
+ def on_edu(self, edu_type, origin, content):
+ """Overrides FederationHandlerRegistry
+ """
+ handler = self.edu_handlers.get(edu_type)
+ if handler:
+ return super(ReplicationFederationHandlerRegistry, self).on_edu(
+ edu_type, origin, content,
+ )
+
+ return self._send_edu(
+ edu_type=edu_type,
+ origin=origin,
+ content=content,
+ )
+
+ def on_query(self, query_type, args):
+ """Overrides FederationHandlerRegistry
+ """
+ handler = self.query_handlers.get(query_type)
+ if handler:
+ return handler(args)
+
+ return self._get_query_client(
+ query_type=query_type,
+ args=args,
+ )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2380d17f4e..f38b393e4a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -49,6 +49,11 @@ from synapse.crypto.event_signing import (
compute_event_signature,
)
from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+ ReplicationCleanRoomRestServlet,
+ ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
@@ -91,6 +96,18 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+
+ self._send_events_to_master = (
+ ReplicationFederationSendEventsRestServlet.make_client(hs)
+ )
+ self._notify_user_membership_change = (
+ ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+ )
+ self._clean_room_for_join_client = (
+ ReplicationCleanRoomRestServlet.make_client(hs)
+ )
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -1158,7 +1175,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
- yield self._persist_events([(event, context)])
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1189,7 +1206,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
- yield self._persist_events([(event, context)])
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1432,7 +1449,7 @@ class FederationHandler(BaseHandler):
event, context
)
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[(event, context)],
backfilled=backfilled,
)
@@ -1470,7 +1487,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@@ -1558,7 +1575,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1569,7 +1586,7 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[(event, new_event_context)],
)
@@ -2297,7 +2314,7 @@ class FederationHandler(BaseHandler):
for revocation.
"""
try:
- response = yield self.hs.get_simple_http_client().get_json(
+ response = yield self.http_client.get_json(
url,
{"public_key": public_key}
)
@@ -2310,7 +2327,7 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks
- def _persist_events(self, event_and_contexts, backfilled=False):
+ def persist_events_and_notify(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
@@ -2322,14 +2339,21 @@ class FederationHandler(BaseHandler):
Returns:
Deferred
"""
- max_stream_id = yield self.store.persist_events(
- event_and_contexts,
- backfilled=backfilled,
- )
+ if self.config.worker_app:
+ yield self._send_events_to_master(
+ store=self.store,
+ event_and_contexts=event_and_contexts,
+ backfilled=backfilled
+ )
+ else:
+ max_stream_id = yield self.store.persist_events(
+ event_and_contexts,
+ backfilled=backfilled,
+ )
- if not backfilled: # Never notify for backfilled events
- for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
+ if not backfilled: # Never notify for backfilled events
+ for event, _ in event_and_contexts:
+ self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2368,9 +2392,25 @@ class FederationHandler(BaseHandler):
)
def _clean_room_for_join(self, room_id):
- return self.store.clean_room_for_join(room_id)
+ """Called to clean up any data in DB for a given room, ready for the
+ server to join the room.
+
+ Args:
+ room_id (str)
+ """
+ if self.config.worker_app:
+ return self._clean_room_for_join_client(room_id)
+ else:
+ return self.store.clean_room_for_join(room_id)
def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
- return user_joined_room(self.distributor, user, room_id)
+ if self.config.worker_app:
+ return self._notify_user_membership_change(
+ room_id=room_id,
+ user_id=user.to_string(),
+ change="joined",
+ )
+ else:
+ return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 589ee94c66..19f214281e 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,7 @@
# limitations under the License.
from synapse.http.server import JsonResource
-from synapse.replication.http import membership, send_event
+from synapse.replication.http import federation, membership, send_event
REPLICATION_PREFIX = "/_synapse/replication"
@@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
+ federation.register_servlets(hs, self)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
new file mode 100644
index 0000000000..2ddd18f73b
--- /dev/null
+++ b/synapse/replication/http/federation.py
@@ -0,0 +1,259 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
+ """Handles events newly received from federation, including persisting and
+ notifying.
+
+ The API looks like:
+
+ POST /_synapse/replication/fed_send_events/:txn_id
+
+ {
+ "events": [{
+ "event": { .. serialized event .. },
+ "internal_metadata": { .. serialized internal_metadata .. },
+ "rejected_reason": .., // The event.rejected_reason field
+ "context": { .. serialized event context .. },
+ }],
+ "backfilled": false
+ """
+
+ NAME = "fed_send_events"
+ PATH_ARGS = ()
+
+ def __init__(self, hs):
+ super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
+
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+ self.federation_handler = hs.get_handlers().federation_handler
+
+ @staticmethod
+ @defer.inlineCallbacks
+ def _serialize_payload(store, event_and_contexts, backfilled):
+ """
+ Args:
+ store
+ event_and_contexts (list[tuple[FrozenEvent, EventContext]])
+ backfilled (bool): Whether or not the events are the result of
+ backfilling
+ """
+ event_payloads = []
+ for event, context in event_and_contexts:
+ serialized_context = yield context.serialize(event, store)
+
+ event_payloads.append({
+ "event": event.get_pdu_json(),
+ "internal_metadata": event.internal_metadata.get_dict(),
+ "rejected_reason": event.rejected_reason,
+ "context": serialized_context,
+ })
+
+ payload = {
+ "events": event_payloads,
+ "backfilled": backfilled,
+ }
+
+ defer.returnValue(payload)
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request):
+ with Measure(self.clock, "repl_fed_send_events_parse"):
+ content = parse_json_object_from_request(request)
+
+ backfilled = content["backfilled"]
+
+ event_payloads = content["events"]
+
+ event_and_contexts = []
+ for event_payload in event_payloads:
+ event_dict = event_payload["event"]
+ internal_metadata = event_payload["internal_metadata"]
+ rejected_reason = event_payload["rejected_reason"]
+ event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+ context = yield EventContext.deserialize(
+ self.store, event_payload["context"],
+ )
+
+ event_and_contexts.append((event, context))
+
+ logger.info(
+ "Got %d events from federation",
+ len(event_and_contexts),
+ )
+
+ yield self.federation_handler.persist_events_and_notify(
+ event_and_contexts, backfilled,
+ )
+
+ defer.returnValue((200, {}))
+
+
+class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
+ """Handles EDUs newly received from federation, including persisting and
+ notifying.
+
+ Request format:
+
+ POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id
+
+ {
+ "origin": ...,
+ "content: { ... }
+ }
+ """
+
+ NAME = "fed_send_edu"
+ PATH_ARGS = ("edu_type",)
+
+ def __init__(self, hs):
+ super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
+
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+ self.registry = hs.get_federation_registry()
+
+ @staticmethod
+ def _serialize_payload(edu_type, origin, content):
+ return {
+ "origin": origin,
+ "content": content,
+ }
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, edu_type):
+ with Measure(self.clock, "repl_fed_send_edu_parse"):
+ content = parse_json_object_from_request(request)
+
+ origin = content["origin"]
+ edu_content = content["content"]
+
+ logger.info(
+ "Got %r edu from $s",
+ edu_type, origin,
+ )
+
+ result = yield self.registry.on_edu(edu_type, origin, edu_content)
+
+ defer.returnValue((200, result))
+
+
+class ReplicationGetQueryRestServlet(ReplicationEndpoint):
+ """Handle responding to queries from federation.
+
+ Request format:
+
+ POST /_synapse/replication/fed_query/:query_type
+
+ {
+ "args": { ... }
+ }
+ """
+
+ NAME = "fed_query"
+ PATH_ARGS = ("query_type",)
+
+ # This is a query, so let's not bother caching
+ CACHE = False
+
+ def __init__(self, hs):
+ super(ReplicationGetQueryRestServlet, self).__init__(hs)
+
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+ self.registry = hs.get_federation_registry()
+
+ @staticmethod
+ def _serialize_payload(query_type, args):
+ """
+ Args:
+ query_type (str)
+ args (dict): The arguments received for the given query type
+ """
+ return {
+ "args": args,
+ }
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, query_type):
+ with Measure(self.clock, "repl_fed_query_parse"):
+ content = parse_json_object_from_request(request)
+
+ args = content["args"]
+
+ logger.info(
+ "Got %r query",
+ query_type,
+ )
+
+ result = yield self.registry.on_query(query_type, args)
+
+ defer.returnValue((200, result))
+
+
+class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
+ """Called to clean up any data in DB for a given room, ready for the
+ server to join the room.
+
+ Request format:
+
+ POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
+
+ {}
+ """
+
+ NAME = "fed_cleanup_room"
+ PATH_ARGS = ("room_id",)
+
+ def __init__(self, hs):
+ super(ReplicationCleanRoomRestServlet, self).__init__(hs)
+
+ self.store = hs.get_datastore()
+
+ @staticmethod
+ def _serialize_payload(room_id, args):
+ """
+ Args:
+ room_id (str)
+ """
+ return {}
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, room_id):
+ yield self.store.clean_room_for_join(room_id)
+
+ defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+ ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+ ReplicationFederationSendEduRestServlet(hs).register(http_server)
+ ReplicationGetQueryRestServlet(hs).register(http_server)
+ ReplicationCleanRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index 9c9a5eadd9..3527beb3c9 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,19 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage import DataStore
from synapse.storage.transactions import TransactionStore
from ._base import BaseSlavedStore
-class TransactionStore(BaseSlavedStore):
- get_destination_retry_timings = TransactionStore.__dict__[
- "get_destination_retry_timings"
- ]
- _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
- set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
- _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
-
- prep_send_transaction = DataStore.prep_send_transaction.__func__
- delivered_txn = DataStore.delivered_txn.__func__
+class SlavedTransactionStore(TransactionStore, BaseSlavedStore):
+ pass
diff --git a/synapse/server.py b/synapse/server.py
index 140be9ebe8..26228d8c72 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import (
FederationHandlerRegistry,
FederationServer,
+ ReplicationFederationHandlerRegistry,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.transaction_queue import TransactionQueue
@@ -423,7 +424,10 @@ class HomeServer(object):
return RoomMemberMasterHandler(self)
def build_federation_registry(self):
- return FederationHandlerRegistry()
+ if self.config.worker_app:
+ return ReplicationFederationHandlerRegistry(self)
+ else:
+ return FederationHandlerRegistry()
def build_server_notices_manager(self):
if self.config.worker_app:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d4aa192a0a..135af54fa9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1436,88 +1436,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
)
@defer.inlineCallbacks
- def have_events_in_timeline(self, event_ids):
- """Given a list of event ids, check if we have already processed and
- stored them as non outliers.
- """
- rows = yield self._simple_select_many_batch(
- table="events",
- retcols=("event_id",),
- column="event_id",
- iterable=list(event_ids),
- keyvalues={"outlier": False},
- desc="have_events_in_timeline",
- )
-
- defer.returnValue(set(r["event_id"] for r in rows))
-
- @defer.inlineCallbacks
- def have_seen_events(self, event_ids):
- """Given a list of event ids, check if we have already processed them.
-
- Args:
- event_ids (iterable[str]):
-
- Returns:
- Deferred[set[str]]: The events we have already seen.
- """
- results = set()
-
- def have_seen_events_txn(txn, chunk):
- sql = (
- "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
- % (",".join("?" * len(chunk)), )
- )
- txn.execute(sql, chunk)
- for (event_id, ) in txn:
- results.add(event_id)
-
- # break the input up into chunks of 100
- input_iterator = iter(event_ids)
- for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
- []):
- yield self.runInteraction(
- "have_seen_events",
- have_seen_events_txn,
- chunk,
- )
- defer.returnValue(results)
-
- def get_seen_events_with_rejections(self, event_ids):
- """Given a list of event ids, check if we rejected them.
-
- Args:
- event_ids (list[str])
-
- Returns:
- Deferred[dict[str, str|None):
- Has an entry for each event id we already have seen. Maps to
- the rejected reason string if we rejected the event, else maps
- to None.
- """
- if not event_ids:
- return defer.succeed({})
-
- def f(txn):
- sql = (
- "SELECT e.event_id, reason FROM events as e "
- "LEFT JOIN rejections as r ON e.event_id = r.event_id "
- "WHERE e.event_id = ?"
- )
-
- res = {}
- for event_id in event_ids:
- txn.execute(sql, (event_id,))
- row = txn.fetchone()
- if row:
- _, rejected = row
- res[event_id] = rejected
-
- return res
-
- return self.runInteraction("get_rejection_reasons", f)
-
- @defer.inlineCallbacks
def count_daily_messages(self):
"""
Returns an estimate of the number of messages sent in the last day.
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 9b4cfeb899..59822178ff 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import itertools
import logging
from collections import namedtuple
@@ -442,3 +443,85 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
defer.returnValue(cache_entry)
+
+ @defer.inlineCallbacks
+ def have_events_in_timeline(self, event_ids):
+ """Given a list of event ids, check if we have already processed and
+ stored them as non outliers.
+ """
+ rows = yield self._simple_select_many_batch(
+ table="events",
+ retcols=("event_id",),
+ column="event_id",
+ iterable=list(event_ids),
+ keyvalues={"outlier": False},
+ desc="have_events_in_timeline",
+ )
+
+ defer.returnValue(set(r["event_id"] for r in rows))
+
+ @defer.inlineCallbacks
+ def have_seen_events(self, event_ids):
+ """Given a list of event ids, check if we have already processed them.
+
+ Args:
+ event_ids (iterable[str]):
+
+ Returns:
+ Deferred[set[str]]: The events we have already seen.
+ """
+ results = set()
+
+ def have_seen_events_txn(txn, chunk):
+ sql = (
+ "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+ % (",".join("?" * len(chunk)), )
+ )
+ txn.execute(sql, chunk)
+ for (event_id, ) in txn:
+ results.add(event_id)
+
+ # break the input up into chunks of 100
+ input_iterator = iter(event_ids)
+ for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+ []):
+ yield self.runInteraction(
+ "have_seen_events",
+ have_seen_events_txn,
+ chunk,
+ )
+ defer.returnValue(results)
+
+ def get_seen_events_with_rejections(self, event_ids):
+ """Given a list of event ids, check if we rejected them.
+
+ Args:
+ event_ids (list[str])
+
+ Returns:
+ Deferred[dict[str, str|None):
+ Has an entry for each event id we already have seen. Maps to
+ the rejected reason string if we rejected the event, else maps
+ to None.
+ """
+ if not event_ids:
+ return defer.succeed({})
+
+ def f(txn):
+ sql = (
+ "SELECT e.event_id, reason FROM events as e "
+ "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+ "WHERE e.event_id = ?"
+ )
+
+ res = {}
+ for event_id in event_ids:
+ txn.execute(sql, (event_id,))
+ row = txn.fetchone()
+ if row:
+ _, rejected = row
+ res[event_id] = rejected
+
+ return res
+
+ return self.runInteraction("get_rejection_reasons", f)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 3147fb6827..3378fc77d1 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple(
class RoomWorkerStore(SQLBaseStore):
+ def get_room(self, room_id):
+ """Retrieve a room.
+
+ Args:
+ room_id (str): The ID of the room to retrieve.
+ Returns:
+ A namedtuple containing the room information, or an empty list.
+ """
+ return self._simple_select_one(
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ retcols=("room_id", "is_public", "creator"),
+ desc="get_room",
+ allow_none=True,
+ )
+
def get_public_room_ids(self):
return self._simple_select_onecol(
table="rooms",
@@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
- def get_room(self, room_id):
- """Retrieve a room.
-
- Args:
- room_id (str): The ID of the room to retrieve.
- Returns:
- A namedtuple containing the room information, or an empty list.
- """
- return self._simple_select_one(
- table="rooms",
- keyvalues={"room_id": room_id},
- retcols=("room_id", "is_public", "creator"),
- desc="get_room",
- allow_none=True,
- )
-
@defer.inlineCallbacks
def set_room_is_public(self, room_id, is_public):
def set_room_is_public_txn(txn, next_id):
|