summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-07-26 13:31:59 +0100
committerErik Johnston <erik@matrix.org>2018-08-06 15:23:38 +0100
commit62ace05c4521caeed023e5b9dadd993afe5c154f (patch)
tree3b4cce70711b187077a266fac08243fb0739ef58
parentImport all functions from TransactionStore (diff)
downloadsynapse-62ace05c4521caeed023e5b9dadd993afe5c154f.tar.xz
Move necessary storage functions to worker classes
-rw-r--r--synapse/storage/events.py82
-rw-r--r--synapse/storage/events_worker.py84
-rw-r--r--synapse/storage/room.py32
3 files changed, 100 insertions, 98 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e8e5a0fe44..5374796fd7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1431,88 +1431,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         )
 
     @defer.inlineCallbacks
-    def have_events_in_timeline(self, event_ids):
-        """Given a list of event ids, check if we have already processed and
-        stored them as non outliers.
-        """
-        rows = yield self._simple_select_many_batch(
-            table="events",
-            retcols=("event_id",),
-            column="event_id",
-            iterable=list(event_ids),
-            keyvalues={"outlier": False},
-            desc="have_events_in_timeline",
-        )
-
-        defer.returnValue(set(r["event_id"] for r in rows))
-
-    @defer.inlineCallbacks
-    def have_seen_events(self, event_ids):
-        """Given a list of event ids, check if we have already processed them.
-
-        Args:
-            event_ids (iterable[str]):
-
-        Returns:
-            Deferred[set[str]]: The events we have already seen.
-        """
-        results = set()
-
-        def have_seen_events_txn(txn, chunk):
-            sql = (
-                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
-                % (",".join("?" * len(chunk)), )
-            )
-            txn.execute(sql, chunk)
-            for (event_id, ) in txn:
-                results.add(event_id)
-
-        # break the input up into chunks of 100
-        input_iterator = iter(event_ids)
-        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
-                          []):
-            yield self.runInteraction(
-                "have_seen_events",
-                have_seen_events_txn,
-                chunk,
-            )
-        defer.returnValue(results)
-
-    def get_seen_events_with_rejections(self, event_ids):
-        """Given a list of event ids, check if we rejected them.
-
-        Args:
-            event_ids (list[str])
-
-        Returns:
-            Deferred[dict[str, str|None):
-                Has an entry for each event id we already have seen. Maps to
-                the rejected reason string if we rejected the event, else maps
-                to None.
-        """
-        if not event_ids:
-            return defer.succeed({})
-
-        def f(txn):
-            sql = (
-                "SELECT e.event_id, reason FROM events as e "
-                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
-                "WHERE e.event_id = ?"
-            )
-
-            res = {}
-            for event_id in event_ids:
-                txn.execute(sql, (event_id,))
-                row = txn.fetchone()
-                if row:
-                    _, rejected = row
-                    res[event_id] = rejected
-
-            return res
-
-        return self.runInteraction("get_rejection_reasons", f)
-
-    @defer.inlineCallbacks
     def count_daily_messages(self):
         """
         Returns an estimate of the number of messages sent in the last day.
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 9b4cfeb899..e1dc2c62fd 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,7 +12,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import itertools
 import logging
+
 from collections import namedtuple
 
 from canonicaljson import json
@@ -442,3 +444,85 @@ class EventsWorkerStore(SQLBaseStore):
             self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
 
         defer.returnValue(cache_entry)
+
+    @defer.inlineCallbacks
+    def have_events_in_timeline(self, event_ids):
+        """Given a list of event ids, check if we have already processed and
+        stored them as non outliers.
+        """
+        rows = yield self._simple_select_many_batch(
+            table="events",
+            retcols=("event_id",),
+            column="event_id",
+            iterable=list(event_ids),
+            keyvalues={"outlier": False},
+            desc="have_events_in_timeline",
+        )
+
+        defer.returnValue(set(r["event_id"] for r in rows))
+
+    @defer.inlineCallbacks
+    def have_seen_events(self, event_ids):
+        """Given a list of event ids, check if we have already processed them.
+
+        Args:
+            event_ids (iterable[str]):
+
+        Returns:
+            Deferred[set[str]]: The events we have already seen.
+        """
+        results = set()
+
+        def have_seen_events_txn(txn, chunk):
+            sql = (
+                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+                % (",".join("?" * len(chunk)), )
+            )
+            txn.execute(sql, chunk)
+            for (event_id, ) in txn:
+                results.add(event_id)
+
+        # break the input up into chunks of 100
+        input_iterator = iter(event_ids)
+        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+                          []):
+            yield self.runInteraction(
+                "have_seen_events",
+                have_seen_events_txn,
+                chunk,
+            )
+        defer.returnValue(results)
+
+    def get_seen_events_with_rejections(self, event_ids):
+        """Given a list of event ids, check if we rejected them.
+
+        Args:
+            event_ids (list[str])
+
+        Returns:
+            Deferred[dict[str, str|None):
+                Has an entry for each event id we already have seen. Maps to
+                the rejected reason string if we rejected the event, else maps
+                to None.
+        """
+        if not event_ids:
+            return defer.succeed({})
+
+        def f(txn):
+            sql = (
+                "SELECT e.event_id, reason FROM events as e "
+                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+                "WHERE e.event_id = ?"
+            )
+
+            res = {}
+            for event_id in event_ids:
+                txn.execute(sql, (event_id,))
+                row = txn.fetchone()
+                if row:
+                    _, rejected = row
+                    res[event_id] = rejected
+
+            return res
+
+        return self.runInteraction("get_rejection_reasons", f)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 3147fb6827..3378fc77d1 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple(
 
 
 class RoomWorkerStore(SQLBaseStore):
+    def get_room(self, room_id):
+        """Retrieve a room.
+
+        Args:
+            room_id (str): The ID of the room to retrieve.
+        Returns:
+            A namedtuple containing the room information, or an empty list.
+        """
+        return self._simple_select_one(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            retcols=("room_id", "is_public", "creator"),
+            desc="get_room",
+            allow_none=True,
+        )
+
     def get_public_room_ids(self):
         return self._simple_select_onecol(
             table="rooms",
@@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
 
-    def get_room(self, room_id):
-        """Retrieve a room.
-
-        Args:
-            room_id (str): The ID of the room to retrieve.
-        Returns:
-            A namedtuple containing the room information, or an empty list.
-        """
-        return self._simple_select_one(
-            table="rooms",
-            keyvalues={"room_id": room_id},
-            retcols=("room_id", "is_public", "creator"),
-            desc="get_room",
-            allow_none=True,
-        )
-
     @defer.inlineCallbacks
     def set_room_is_public(self, room_id, is_public):
         def set_room_is_public_txn(txn, next_id):