summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/federation_reader.py10
-rw-r--r--synapse/federation/federation_server.py43
-rw-r--r--synapse/handlers/federation.py46
-rw-r--r--synapse/replication/http/__init__.py3
-rw-r--r--synapse/replication/http/federation.py245
-rw-r--r--synapse/replication/slave/storage/transactions.py13
-rw-r--r--synapse/server.py6
-rw-r--r--synapse/storage/events.py82
-rw-r--r--synapse/storage/events_worker.py83
-rw-r--r--synapse/storage/room.py32
10 files changed, 443 insertions, 120 deletions
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 7af00b8bcf..0c557a8242 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -32,9 +32,14 @@ from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
@@ -49,6 +54,11 @@ logger = logging.getLogger("synapse.app.federation_reader")
 
 
 class FederationReaderSlavedStore(
+    SlavedProfileStore,
+    SlavedApplicationServiceStore,
+    SlavedPusherStore,
+    SlavedPushRuleStore,
+    SlavedReceiptsStore,
     SlavedEventStore,
     SlavedKeyStore,
     RoomStore,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2b62f687b6..d23c1cf13b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -39,6 +39,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEduRestServlet,
+    ReplicationGetQueryRestServlet,
+)
 from synapse.types import get_domain_from_id
 from synapse.util import async
 from synapse.util.caches.response_cache import ResponseCache
@@ -760,6 +764,8 @@ class FederationHandlerRegistry(object):
         if edu_type in self.edu_handlers:
             raise KeyError("Already have an EDU handler for %s" % (edu_type,))
 
+        logger.info("Registering federation EDU handler for %r", edu_type)
+
         self.edu_handlers[edu_type] = handler
 
     def register_query_handler(self, query_type, handler):
@@ -778,6 +784,8 @@ class FederationHandlerRegistry(object):
                 "Already have a Query handler for %s" % (query_type,)
             )
 
+        logger.info("Registering federation query handler for %r", query_type)
+
         self.query_handlers[query_type] = handler
 
     @defer.inlineCallbacks
@@ -800,3 +808,38 @@ class FederationHandlerRegistry(object):
             raise NotFoundError("No handler for Query type '%s'" % (query_type,))
 
         return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+    def __init__(self, hs):
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+        self.clock = hs.get_clock()
+
+        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+        super(ReplicationFederationHandlerRegistry, self).__init__()
+
+    def on_edu(self, edu_type, origin, content):
+        handler = self.edu_handlers.get(edu_type)
+        if handler:
+            return super(ReplicationFederationHandlerRegistry, self).on_edu(
+                edu_type, origin, content,
+            )
+
+        return self._send_edu(
+                edu_type=edu_type,
+                origin=origin,
+                content=content,
+        )
+
+    def on_query(self, query_type, args):
+        handler = self.query_handlers.get(query_type)
+        if handler:
+            return handler(args)
+
+        return self._get_query_client(
+                query_type=query_type,
+                args=args,
+        )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0dffd44e22..37d2307d0a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -49,6 +49,10 @@ from synapse.crypto.event_signing import (
     compute_event_signature,
 )
 from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
 from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
@@ -91,6 +95,15 @@ class FederationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+
+        self._send_events_to_master = (
+            ReplicationFederationSendEventsRestServlet.make_client(hs)
+        )
+        self._notify_user_membership_change = (
+            ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+        )
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
@@ -2297,7 +2310,7 @@ class FederationHandler(BaseHandler):
                 for revocation.
         """
         try:
-            response = yield self.hs.get_simple_http_client().get_json(
+            response = yield self.http_client.get_json(
                 url,
                 {"public_key": public_key}
             )
@@ -2322,14 +2335,21 @@ class FederationHandler(BaseHandler):
         Returns:
             Deferred
         """
-        max_stream_id = yield self.store.persist_events(
-            event_and_contexts,
-            backfilled=backfilled,
-        )
+        if self.config.worker_app:
+            yield self._send_events_to_master(
+                store=self.store,
+                event_and_contexts=event_and_contexts,
+                backfilled=backfilled
+            )
+        else:
+            max_stream_id = yield self.store.persist_events(
+                event_and_contexts,
+                backfilled=backfilled,
+            )
 
-        if not backfilled:  # Never notify for backfilled events
-            for event, _ in event_and_contexts:
-                self._notify_persisted_event(event, max_stream_id)
+            if not backfilled:  # Never notify for backfilled events
+                for event, _ in event_and_contexts:
+                    self._notify_persisted_event(event, max_stream_id)
 
     def _notify_persisted_event(self, event, max_stream_id):
         """Checks to see if notifier/pushers should be notified about the
@@ -2368,9 +2388,17 @@ class FederationHandler(BaseHandler):
         )
 
     def _clean_room_for_join(self, room_id):
+        # TODO move this out to master
         return self.store.clean_room_for_join(room_id)
 
     def user_joined_room(self, user, room_id):
         """Called when a new user has joined the room
         """
-        return user_joined_room(self.distributor, user, room_id)
+        if self.config.worker_app:
+            return self._notify_user_membership_change(
+                room_id=room_id,
+                user_id=user.to_string(),
+                change="joined",
+            )
+        else:
+            return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 589ee94c66..19f214281e 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from synapse.http.server import JsonResource
-from synapse.replication.http import membership, send_event
+from synapse.replication.http import federation, membership, send_event
 
 REPLICATION_PREFIX = "/_synapse/replication"
 
@@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource):
     def register_servlets(self, hs):
         send_event.register_servlets(hs, self)
         membership.register_servlets(hs, self)
+        federation.register_servlets(hs, self)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
new file mode 100644
index 0000000000..3fa7bd64c7
--- /dev/null
+++ b/synapse/replication/http/federation.py
@@ -0,0 +1,245 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import UserID
+from synapse.util.logcontext import run_in_background
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
+    """Handles events newly received from federation, including persisting and
+    notifying.
+
+    The API looks like:
+
+        POST /_synapse/replication/fed_send_events/:txn_id
+
+        {
+            "events": [{
+                "event": { .. serialized event .. },
+                "internal_metadata": { .. serialized internal_metadata .. },
+                "rejected_reason": ..,   // The event.rejected_reason field
+                "context": { .. serialized event context .. },
+            }],
+            "backfilled": false
+    """
+
+    NAME = "fed_send_events"
+    PATH_ARGS = ()
+
+    def __init__(self, hs):
+        super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.is_mine_id = hs.is_mine_id
+        self.notifier = hs.get_notifier()
+        self.pusher_pool = hs.get_pusherpool()
+
+    @staticmethod
+    @defer.inlineCallbacks
+    def _serialize_payload(store, event_and_contexts, backfilled):
+        """
+        Args:
+            store
+            event_and_contexts (list[tuple[FrozenEvent, EventContext]])
+            backfilled (bool): Whether or not the events are the result of
+                backfilling
+        """
+        event_payloads = []
+        for event, context in event_and_contexts:
+            serialized_context = yield context.serialize(event, store)
+
+            event_payloads.append({
+                "event": event.get_pdu_json(),
+                "internal_metadata": event.internal_metadata.get_dict(),
+                "rejected_reason": event.rejected_reason,
+                "context": serialized_context,
+            })
+
+        payload = {
+            "events": event_payloads,
+            "backfilled": backfilled,
+        }
+
+        defer.returnValue(payload)
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request):
+        with Measure(self.clock, "repl_fed_send_events_parse"):
+            content = parse_json_object_from_request(request)
+
+            backfilled = content["backfilled"]
+
+            event_payloads = content["events"]
+
+            event_and_contexts = []
+            for event_payload in event_payloads:
+                event_dict = event_payload["event"]
+                internal_metadata = event_payload["internal_metadata"]
+                rejected_reason = event_payload["rejected_reason"]
+                event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+                context = yield EventContext.deserialize(
+                    self.store, event_payload["context"],
+                )
+
+                event_and_contexts.append((event, context))
+
+        logger.info(
+            "Got %d events from federation",
+            len(event_and_contexts),
+        )
+
+        max_stream_id = yield self.store.persist_events(
+            event_and_contexts,
+            backfilled=backfilled
+        )
+
+        if not backfilled:
+            for event, _ in event_and_contexts:
+                self._notify_persisted_event(event, max_stream_id)
+
+        defer.returnValue((200, {}))
+
+    def _notify_persisted_event(self, event, max_stream_id):
+        extra_users = []
+        if event.type == EventTypes.Member:
+            target_user_id = event.state_key
+
+            # We notify for memberships if its an invite for one of our
+            # users
+            if event.internal_metadata.is_outlier():
+                if event.membership != Membership.INVITE:
+                    if not self.is_mine_id(target_user_id):
+                        return
+
+            target_user = UserID.from_string(target_user_id)
+            extra_users.append(target_user)
+        elif event.internal_metadata.is_outlier():
+            return
+
+        event_stream_id = event.internal_metadata.stream_ordering
+        self.notifier.on_new_room_event(
+            event, event_stream_id, max_stream_id,
+            extra_users=extra_users
+        )
+
+        run_in_background(
+            self.pusher_pool.on_new_notifications,
+            event_stream_id, max_stream_id,
+        )
+
+
+class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
+    """Handles EDUs newly received from federation, including persisting and
+    notifying.
+    """
+
+    NAME = "fed_send_edu"
+    PATH_ARGS = ("edu_type",)
+
+    def __init__(self, hs):
+        super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.registry = hs.get_federation_registry()
+
+    @staticmethod
+    def _serialize_payload(edu_type, origin, content):
+        return {
+            "origin": origin,
+            "content": content,
+        }
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request, edu_type):
+        with Measure(self.clock, "repl_fed_send_edu_parse"):
+            content = parse_json_object_from_request(request)
+
+            origin = content["origin"]
+            edu_content = content["content"]
+
+        logger.info(
+            "Got %r edu from $s",
+            edu_type, origin,
+        )
+
+        result = yield self.registry.on_edu(edu_type, origin, edu_content)
+
+        defer.returnValue((200, result))
+
+
+class ReplicationGetQueryRestServlet(ReplicationEndpoint):
+    """Handle responding to queries from federation.
+    """
+
+    NAME = "fed_query"
+    PATH_ARGS = ("query_type",)
+
+    # This is a query, so let's not bother caching
+    CACHE = False
+
+    def __init__(self, hs):
+        super(ReplicationGetQueryRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.registry = hs.get_federation_registry()
+
+    @staticmethod
+    def _serialize_payload(query_type, args):
+        """
+        Args:
+            query_type (str)
+            args (dict): The arguments received for the given query type
+        """
+        return {
+            "args": args,
+        }
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request, query_type):
+        with Measure(self.clock, "repl_fed_query_parse"):
+            content = parse_json_object_from_request(request)
+
+            args = content["args"]
+
+        logger.info(
+            "Got %r query",
+            query_type,
+        )
+
+        result = yield self.registry.on_query(query_type, args)
+
+        defer.returnValue((200, result))
+
+
+def register_servlets(hs, http_server):
+    ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+    ReplicationFederationSendEduRestServlet(hs).register(http_server)
+    ReplicationGetQueryRestServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index 9c9a5eadd9..cd6416c47c 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,19 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage import DataStore
 from synapse.storage.transactions import TransactionStore
 
 from ._base import BaseSlavedStore
 
 
-class TransactionStore(BaseSlavedStore):
-    get_destination_retry_timings = TransactionStore.__dict__[
-        "get_destination_retry_timings"
-    ]
-    _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
-    set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
-    _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
-
-    prep_send_transaction = DataStore.prep_send_transaction.__func__
-    delivered_txn = DataStore.delivered_txn.__func__
+class TransactionStore(TransactionStore, BaseSlavedStore):
+    pass
diff --git a/synapse/server.py b/synapse/server.py
index 140be9ebe8..26228d8c72 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient
 from synapse.federation.federation_server import (
     FederationHandlerRegistry,
     FederationServer,
+    ReplicationFederationHandlerRegistry,
 )
 from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.transaction_queue import TransactionQueue
@@ -423,7 +424,10 @@ class HomeServer(object):
         return RoomMemberMasterHandler(self)
 
     def build_federation_registry(self):
-        return FederationHandlerRegistry()
+        if self.config.worker_app:
+            return ReplicationFederationHandlerRegistry(self)
+        else:
+            return FederationHandlerRegistry()
 
     def build_server_notices_manager(self):
         if self.config.worker_app:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ce32e8fefd..ccfda5b0fa 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1436,88 +1436,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         )
 
     @defer.inlineCallbacks
-    def have_events_in_timeline(self, event_ids):
-        """Given a list of event ids, check if we have already processed and
-        stored them as non outliers.
-        """
-        rows = yield self._simple_select_many_batch(
-            table="events",
-            retcols=("event_id",),
-            column="event_id",
-            iterable=list(event_ids),
-            keyvalues={"outlier": False},
-            desc="have_events_in_timeline",
-        )
-
-        defer.returnValue(set(r["event_id"] for r in rows))
-
-    @defer.inlineCallbacks
-    def have_seen_events(self, event_ids):
-        """Given a list of event ids, check if we have already processed them.
-
-        Args:
-            event_ids (iterable[str]):
-
-        Returns:
-            Deferred[set[str]]: The events we have already seen.
-        """
-        results = set()
-
-        def have_seen_events_txn(txn, chunk):
-            sql = (
-                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
-                % (",".join("?" * len(chunk)), )
-            )
-            txn.execute(sql, chunk)
-            for (event_id, ) in txn:
-                results.add(event_id)
-
-        # break the input up into chunks of 100
-        input_iterator = iter(event_ids)
-        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
-                          []):
-            yield self.runInteraction(
-                "have_seen_events",
-                have_seen_events_txn,
-                chunk,
-            )
-        defer.returnValue(results)
-
-    def get_seen_events_with_rejections(self, event_ids):
-        """Given a list of event ids, check if we rejected them.
-
-        Args:
-            event_ids (list[str])
-
-        Returns:
-            Deferred[dict[str, str|None):
-                Has an entry for each event id we already have seen. Maps to
-                the rejected reason string if we rejected the event, else maps
-                to None.
-        """
-        if not event_ids:
-            return defer.succeed({})
-
-        def f(txn):
-            sql = (
-                "SELECT e.event_id, reason FROM events as e "
-                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
-                "WHERE e.event_id = ?"
-            )
-
-            res = {}
-            for event_id in event_ids:
-                txn.execute(sql, (event_id,))
-                row = txn.fetchone()
-                if row:
-                    _, rejected = row
-                    res[event_id] = rejected
-
-            return res
-
-        return self.runInteraction("get_rejection_reasons", f)
-
-    @defer.inlineCallbacks
     def count_daily_messages(self):
         """
         Returns an estimate of the number of messages sent in the last day.
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 9b4cfeb899..59822178ff 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import itertools
 import logging
 from collections import namedtuple
 
@@ -442,3 +443,85 @@ class EventsWorkerStore(SQLBaseStore):
             self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
 
         defer.returnValue(cache_entry)
+
+    @defer.inlineCallbacks
+    def have_events_in_timeline(self, event_ids):
+        """Given a list of event ids, check if we have already processed and
+        stored them as non outliers.
+        """
+        rows = yield self._simple_select_many_batch(
+            table="events",
+            retcols=("event_id",),
+            column="event_id",
+            iterable=list(event_ids),
+            keyvalues={"outlier": False},
+            desc="have_events_in_timeline",
+        )
+
+        defer.returnValue(set(r["event_id"] for r in rows))
+
+    @defer.inlineCallbacks
+    def have_seen_events(self, event_ids):
+        """Given a list of event ids, check if we have already processed them.
+
+        Args:
+            event_ids (iterable[str]):
+
+        Returns:
+            Deferred[set[str]]: The events we have already seen.
+        """
+        results = set()
+
+        def have_seen_events_txn(txn, chunk):
+            sql = (
+                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+                % (",".join("?" * len(chunk)), )
+            )
+            txn.execute(sql, chunk)
+            for (event_id, ) in txn:
+                results.add(event_id)
+
+        # break the input up into chunks of 100
+        input_iterator = iter(event_ids)
+        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+                          []):
+            yield self.runInteraction(
+                "have_seen_events",
+                have_seen_events_txn,
+                chunk,
+            )
+        defer.returnValue(results)
+
+    def get_seen_events_with_rejections(self, event_ids):
+        """Given a list of event ids, check if we rejected them.
+
+        Args:
+            event_ids (list[str])
+
+        Returns:
+            Deferred[dict[str, str|None):
+                Has an entry for each event id we already have seen. Maps to
+                the rejected reason string if we rejected the event, else maps
+                to None.
+        """
+        if not event_ids:
+            return defer.succeed({})
+
+        def f(txn):
+            sql = (
+                "SELECT e.event_id, reason FROM events as e "
+                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+                "WHERE e.event_id = ?"
+            )
+
+            res = {}
+            for event_id in event_ids:
+                txn.execute(sql, (event_id,))
+                row = txn.fetchone()
+                if row:
+                    _, rejected = row
+                    res[event_id] = rejected
+
+            return res
+
+        return self.runInteraction("get_rejection_reasons", f)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 3147fb6827..3378fc77d1 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple(
 
 
 class RoomWorkerStore(SQLBaseStore):
+    def get_room(self, room_id):
+        """Retrieve a room.
+
+        Args:
+            room_id (str): The ID of the room to retrieve.
+        Returns:
+            A namedtuple containing the room information, or an empty list.
+        """
+        return self._simple_select_one(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            retcols=("room_id", "is_public", "creator"),
+            desc="get_room",
+            allow_none=True,
+        )
+
     def get_public_room_ids(self):
         return self._simple_select_onecol(
             table="rooms",
@@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
 
-    def get_room(self, room_id):
-        """Retrieve a room.
-
-        Args:
-            room_id (str): The ID of the room to retrieve.
-        Returns:
-            A namedtuple containing the room information, or an empty list.
-        """
-        return self._simple_select_one(
-            table="rooms",
-            keyvalues={"room_id": room_id},
-            retcols=("room_id", "is_public", "creator"),
-            desc="get_room",
-            allow_none=True,
-        )
-
     @defer.inlineCallbacks
     def set_room_is_public(self, room_id, is_public):
         def set_room_is_public_txn(txn, next_id):