summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py23
-rw-r--r--synapse/storage/_base.py190
-rw-r--r--synapse/storage/event_federation.py101
-rw-r--r--synapse/storage/events.py75
-rw-r--r--synapse/storage/pusher.py4
-rw-r--r--synapse/storage/registration.py13
-rw-r--r--synapse/storage/roommember.py20
-rw-r--r--synapse/storage/schema/delta/24/stats_reporting.sql22
-rw-r--r--synapse/storage/signatures.py112
-rw-r--r--synapse/storage/state.py6
-rw-r--r--synapse/storage/stream.py42
11 files changed, 174 insertions, 434 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 77cb1dbd81..340e59afcb 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -54,7 +54,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 23
+SCHEMA_VERSION = 24
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -126,6 +126,27 @@ class DataStore(RoomMemberStore, RoomStore,
             lock=False,
         )
 
+    @defer.inlineCallbacks
+    def count_daily_users(self):
+        """
+        Counts the number of users who used this homeserver in the last 24 hours.
+        """
+        def _count_users(txn):
+            txn.execute(
+                "SELECT COUNT(DISTINCT user_id) AS users"
+                " FROM user_ips"
+                " WHERE last_seen > ?",
+                # This is close enough to a day for our purposes.
+                (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
+            )
+            rows = self.cursor_to_dict(txn)
+            if rows:
+                return rows[0]["users"]
+            return 0
+
+        ret = yield self.runInteraction("count_users", _count_users)
+        defer.returnValue(ret)
+
     def get_user_ip_and_agents(self, user):
         return self._simple_select_list(
             table="user_ips",
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 495ef087c9..693784ad38 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -25,8 +25,6 @@ from util.id_generators import IdGenerator, StreamIdGenerator
 
 from twisted.internet import defer
 
-from collections import namedtuple
-
 import sys
 import time
 import threading
@@ -376,9 +374,6 @@ class SQLBaseStore(object):
 
         return self.runInteraction(desc, interaction)
 
-    def _execute_and_decode(self, desc, query, *args):
-        return self._execute(desc, self.cursor_to_dict, query, *args)
-
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
@@ -691,37 +686,6 @@ class SQLBaseStore(object):
 
         return dict(zip(retcols, row))
 
-    def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None,
-                                 retcols=None, allow_none=False,
-                                 desc="_simple_selectupdate_one"):
-        """ Combined SELECT then UPDATE."""
-        def func(txn):
-            ret = None
-            if retcols:
-                ret = self._simple_select_one_txn(
-                    txn,
-                    table=table,
-                    keyvalues=keyvalues,
-                    retcols=retcols,
-                    allow_none=allow_none,
-                )
-
-            if updatevalues:
-                self._simple_update_one_txn(
-                    txn,
-                    table=table,
-                    keyvalues=keyvalues,
-                    updatevalues=updatevalues,
-                )
-
-                # if txn.rowcount == 0:
-                #     raise StoreError(404, "No row found")
-                if txn.rowcount > 1:
-                    raise StoreError(500, "More than one row matched")
-
-            return ret
-        return self.runInteraction(desc, func)
-
     def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"):
         """Executes a DELETE query on the named table, expecting to delete a
         single row.
@@ -743,16 +707,6 @@ class SQLBaseStore(object):
                 raise StoreError(500, "more than one row matched")
         return self.runInteraction(desc, func)
 
-    def _simple_delete(self, table, keyvalues, desc="_simple_delete"):
-        """Executes a DELETE query on the named table.
-
-        Args:
-            table : string giving the table name
-            keyvalues : dict of column names and values to select the row with
-        """
-
-        return self.runInteraction(desc, self._simple_delete_txn)
-
     def _simple_delete_txn(self, txn, table, keyvalues):
         sql = "DELETE FROM %s WHERE %s" % (
             table,
@@ -761,24 +715,6 @@ class SQLBaseStore(object):
 
         return txn.execute(sql, keyvalues.values())
 
-    def _simple_max_id(self, table):
-        """Executes a SELECT query on the named table, expecting to return the
-        max value for the column "id".
-
-        Args:
-            table : string giving the table name
-        """
-        sql = "SELECT MAX(id) AS id FROM %s" % table
-
-        def func(txn):
-            txn.execute(sql)
-            max_id = self.cursor_to_dict(txn)[0]["id"]
-            if max_id is None:
-                return 0
-            return max_id
-
-        return self.runInteraction("_simple_max_id", func)
-
     def get_next_stream_id(self):
         with self._next_stream_id_lock:
             i = self._next_stream_id
@@ -791,129 +727,3 @@ class _RollbackButIsFineException(Exception):
     something went wrong.
     """
     pass
-
-
-class Table(object):
-    """ A base class used to store information about a particular table.
-    """
-
-    table_name = None
-    """ str: The name of the table """
-
-    fields = None
-    """ list: The field names """
-
-    EntryType = None
-    """ Type: A tuple type used to decode the results """
-
-    _select_where_clause = "SELECT %s FROM %s WHERE %s"
-    _select_clause = "SELECT %s FROM %s"
-    _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)"
-
-    @classmethod
-    def select_statement(cls, where_clause=None):
-        """
-        Args:
-            where_clause (str): The WHERE clause to use.
-
-        Returns:
-            str: An SQL statement to select rows from the table with the given
-            WHERE clause.
-        """
-        if where_clause:
-            return cls._select_where_clause % (
-                ", ".join(cls.fields),
-                cls.table_name,
-                where_clause
-            )
-        else:
-            return cls._select_clause % (
-                ", ".join(cls.fields),
-                cls.table_name,
-            )
-
-    @classmethod
-    def insert_statement(cls):
-        return cls._insert_clause % (
-            cls.table_name,
-            ", ".join(cls.fields),
-            ", ".join(["?"] * len(cls.fields)),
-        )
-
-    @classmethod
-    def decode_single_result(cls, results):
-        """ Given an iterable of tuples, return a single instance of
-            `EntryType` or None if the iterable is empty
-        Args:
-            results (list): The results list to convert to `EntryType`
-        Returns:
-            EntryType: An instance of `EntryType`
-        """
-        results = list(results)
-        if results:
-            return cls.EntryType(*results[0])
-        else:
-            return None
-
-    @classmethod
-    def decode_results(cls, results):
-        """ Given an iterable of tuples, return a list of `EntryType`
-        Args:
-            results (list): The results list to convert to `EntryType`
-
-        Returns:
-            list: A list of `EntryType`
-        """
-        return [cls.EntryType(*row) for row in results]
-
-    @classmethod
-    def get_fields_string(cls, prefix=None):
-        if prefix:
-            to_join = ("%s.%s" % (prefix, f) for f in cls.fields)
-        else:
-            to_join = cls.fields
-
-        return ", ".join(to_join)
-
-
-class JoinHelper(object):
-    """ Used to help do joins on tables by looking at the tables' fields and
-    creating a list of unique fields to use with SELECTs and a namedtuple
-    to dump the results into.
-
-    Attributes:
-        tables (list): List of `Table` classes
-        EntryType (type)
-    """
-
-    def __init__(self, *tables):
-        self.tables = tables
-
-        res = []
-        for table in self.tables:
-            res += [f for f in table.fields if f not in res]
-
-        self.EntryType = namedtuple("JoinHelperEntry", res)
-
-    def get_fields(self, **prefixes):
-        """Get a string representing a list of fields for use in SELECT
-        statements with the given prefixes applied to each.
-
-        For example::
-
-            JoinHelper(PdusTable, StateTable).get_fields(
-                PdusTable="pdus",
-                StateTable="state"
-            )
-        """
-        res = []
-        for field in self.EntryType._fields:
-            for table in self.tables:
-                if field in table.fields:
-                    res.append("%s.%s" % (prefixes[table.__name__], field))
-                    break
-
-        return ", ".join(res)
-
-    def decode_results(self, rows):
-        return [self.EntryType(*row) for row in rows]
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 989ad340b0..6d4421dd8f 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -154,98 +154,6 @@ class EventFederationStore(SQLBaseStore):
 
         return results
 
-    def _get_latest_state_in_room(self, txn, room_id, type, state_key):
-        event_ids = self._simple_select_onecol_txn(
-            txn,
-            table="state_forward_extremities",
-            keyvalues={
-                "room_id": room_id,
-                "type": type,
-                "state_key": state_key,
-            },
-            retcol="event_id",
-        )
-
-        results = []
-        for event_id in event_ids:
-            hashes = self._get_event_reference_hashes_txn(txn, event_id)
-            prev_hashes = {
-                k: encode_base64(v) for k, v in hashes.items()
-                if k == "sha256"
-            }
-            results.append((event_id, prev_hashes))
-
-        return results
-
-    def _get_prev_events(self, txn, event_id):
-        results = self._get_prev_events_and_state(
-            txn,
-            event_id,
-            is_state=0,
-        )
-
-        return [(e_id, h, ) for e_id, h, _ in results]
-
-    def _get_prev_state(self, txn, event_id):
-        results = self._get_prev_events_and_state(
-            txn,
-            event_id,
-            is_state=True,
-        )
-
-        return [(e_id, h, ) for e_id, h, _ in results]
-
-    def _get_prev_events_and_state(self, txn, event_id, is_state=None):
-        keyvalues = {
-            "event_id": event_id,
-        }
-
-        if is_state is not None:
-            keyvalues["is_state"] = bool(is_state)
-
-        res = self._simple_select_list_txn(
-            txn,
-            table="event_edges",
-            keyvalues=keyvalues,
-            retcols=["prev_event_id", "is_state"],
-        )
-
-        hashes = self._get_prev_event_hashes_txn(txn, event_id)
-
-        results = []
-        for d in res:
-            edge_hash = self._get_event_reference_hashes_txn(txn, d["prev_event_id"])
-            edge_hash.update(hashes.get(d["prev_event_id"], {}))
-            prev_hashes = {
-                k: encode_base64(v)
-                for k, v in edge_hash.items()
-                if k == "sha256"
-            }
-            results.append((d["prev_event_id"], prev_hashes, d["is_state"]))
-
-        return results
-
-    def _get_auth_events(self, txn, event_id):
-        auth_ids = self._simple_select_onecol_txn(
-            txn,
-            table="event_auth",
-            keyvalues={
-                "event_id": event_id,
-            },
-            retcol="auth_id",
-        )
-
-        results = []
-        for auth_id in auth_ids:
-            hashes = self._get_event_reference_hashes_txn(txn, auth_id)
-            prev_hashes = {
-                k: encode_base64(v) for k, v in hashes.items()
-                if k == "sha256"
-            }
-            results.append((auth_id, prev_hashes))
-
-        return results
-
     def get_min_depth(self, room_id):
         """ For hte given room, get the minimum depth we have seen for it.
         """
@@ -303,6 +211,15 @@ class EventFederationStore(SQLBaseStore):
             ],
         )
 
+        self._update_extremeties(txn, events)
+
+    def _update_extremeties(self, txn, events):
+        """Updates the event_*_extremities tables based on the new/updated
+        events being persisted.
+
+        This is called for new events *and* for events that were outliers, but
+        are are now being persisted as non-outliers.
+        """
         events_by_room = {}
         for ev in events:
             events_by_room.setdefault(ev.room_id, []).append(ev)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index fba837f461..416ef6af93 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from _base import SQLBaseStore, _RollbackButIsFineException
 
 from twisted.internet import defer, reactor
@@ -28,6 +27,7 @@ from canonicaljson import encode_canonical_json
 from contextlib import contextmanager
 
 import logging
+import math
 import ujson as json
 
 logger = logging.getLogger(__name__)
@@ -281,6 +281,8 @@ class EventsStore(SQLBaseStore):
                     (False, event.event_id,)
                 )
 
+                self._update_extremeties(txn, [event])
+
         events_and_contexts = filter(
             lambda ec: ec[0] not in to_remove,
             events_and_contexts
@@ -888,18 +890,69 @@ class EventsStore(SQLBaseStore):
 
         return ev
 
-    def _parse_events(self, rows):
-        return self.runInteraction(
-            "_parse_events", self._parse_events_txn, rows
-        )
-
     def _parse_events_txn(self, txn, rows):
         event_ids = [r["event_id"] for r in rows]
 
         return self._get_events_txn(txn, event_ids)
 
-    def _has_been_redacted_txn(self, txn, event):
-        sql = "SELECT event_id FROM redactions WHERE redacts = ?"
-        txn.execute(sql, (event.event_id,))
-        result = txn.fetchone()
-        return result[0] if result else None
+    @defer.inlineCallbacks
+    def count_daily_messages(self):
+        """
+        Returns an estimate of the number of messages sent in the last day.
+
+        If it has been significantly less or more than one day since the last
+        call to this function, it will return None.
+        """
+        def _count_messages(txn):
+            now = self.hs.get_clock().time()
+
+            txn.execute(
+                "SELECT reported_stream_token, reported_time FROM stats_reporting"
+            )
+            last_reported = self.cursor_to_dict(txn)
+
+            txn.execute(
+                "SELECT stream_ordering"
+                " FROM events"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT 1"
+            )
+            now_reporting = self.cursor_to_dict(txn)
+            if not now_reporting:
+                return None
+            now_reporting = now_reporting[0]["stream_ordering"]
+
+            txn.execute("DELETE FROM stats_reporting")
+            txn.execute(
+                "INSERT INTO stats_reporting"
+                " (reported_stream_token, reported_time)"
+                " VALUES (?, ?)",
+                (now_reporting, now,)
+            )
+
+            if not last_reported:
+                return None
+
+            # Close enough to correct for our purposes.
+            yesterday = (now - 24 * 60 * 60)
+            if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
+                return None
+
+            txn.execute(
+                "SELECT COUNT(*) as messages"
+                " FROM events NATURAL JOIN event_json"
+                " WHERE json like '%m.room.message%'"
+                " AND stream_ordering > ?"
+                " AND stream_ordering <= ?",
+                (
+                    last_reported[0]["reported_stream_token"],
+                    now_reporting,
+                )
+            )
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return None
+            return rows[0]["messages"]
+
+        ret = yield self.runInteraction("count_messages", _count_messages)
+        defer.returnValue(ret)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 00b748f131..345c4e1104 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore, Table
+from ._base import SQLBaseStore
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
@@ -149,5 +149,5 @@ class PusherStore(SQLBaseStore):
         )
 
 
-class PushersTable(Table):
+class PushersTable(object):
     table_name = "pushers"
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c9ceb132ae..b454dd5b3a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -289,3 +289,16 @@ class RegistrationStore(SQLBaseStore):
         if ret:
             defer.returnValue(ret['user_id'])
         defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def count_all_users(self):
+        """Counts all users registered on the homeserver."""
+        def _count_users(txn):
+            txn.execute("SELECT COUNT(*) AS users FROM users")
+            rows = self.cursor_to_dict(txn)
+            if rows:
+                return rows[0]["users"]
+            return 0
+
+        ret = yield self.runInteraction("count_users", _count_users)
+        defer.returnValue(ret)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8eee2dfbcc..8c40d9a8a6 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
 
 RoomsForUser = namedtuple(
     "RoomsForUser",
-    ("room_id", "sender", "membership")
+    ("room_id", "sender", "membership", "event_id", "stream_ordering")
 )
 
 
@@ -141,11 +141,13 @@ class RoomMemberStore(SQLBaseStore):
         args.extend(membership_list)
 
         sql = (
-            "SELECT m.room_id, m.sender, m.membership"
-            " FROM room_memberships as m"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id "
-            " AND m.room_id = c.room_id "
+            "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
+            " FROM current_state_events as c"
+            " INNER JOIN room_memberships as m"
+            " ON m.event_id = c.event_id"
+            " INNER JOIN events as e"
+            " ON e.event_id = c.event_id"
+            " AND m.room_id = c.room_id"
             " AND m.user_id = c.state_key"
             " WHERE %s"
         ) % (where_clause,)
@@ -176,12 +178,6 @@ class RoomMemberStore(SQLBaseStore):
 
         return joined_domains
 
-    def _get_members_query(self, where_clause, where_values):
-        return self.runInteraction(
-            "get_members_query", self._get_members_events_txn,
-            where_clause, where_values
-        ).addCallbacks(self._get_events)
-
     def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
         rows = self._get_members_rows_txn(
             txn,
diff --git a/synapse/storage/schema/delta/24/stats_reporting.sql b/synapse/storage/schema/delta/24/stats_reporting.sql
new file mode 100644
index 0000000000..e9165d2917
--- /dev/null
+++ b/synapse/storage/schema/delta/24/stats_reporting.sql
@@ -0,0 +1,22 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Should only ever contain one row
+CREATE TABLE IF NOT EXISTS stats_reporting(
+  -- The stream ordering token which was most recently reported as stats
+  reported_stream_token INTEGER,
+  -- The time (seconds since epoch) stats were most recently reported
+  reported_time BIGINT
+);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index ab57b92174..b070be504d 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -24,41 +24,6 @@ from synapse.crypto.event_signing import compute_event_reference_hash
 class SignatureStore(SQLBaseStore):
     """Persistence for event signatures and hashes"""
 
-    def _get_event_content_hashes_txn(self, txn, event_id):
-        """Get all the hashes for a given Event.
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-        Returns:
-            A dict of algorithm -> hash.
-        """
-        query = (
-            "SELECT algorithm, hash"
-            " FROM event_content_hashes"
-            " WHERE event_id = ?"
-        )
-        txn.execute(query, (event_id, ))
-        return dict(txn.fetchall())
-
-    def _store_event_content_hash_txn(self, txn, event_id, algorithm,
-                                      hash_bytes):
-        """Store a hash for a Event
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-            algorithm (str): Hashing algorithm.
-            hash_bytes (bytes): Hash function output bytes.
-        """
-        self._simple_insert_txn(
-            txn,
-            "event_content_hashes",
-            {
-                "event_id": event_id,
-                "algorithm": algorithm,
-                "hash": buffer(hash_bytes),
-            },
-        )
-
     def get_event_reference_hashes(self, event_ids):
         def f(txn):
             return [
@@ -123,80 +88,3 @@ class SignatureStore(SQLBaseStore):
             table="event_reference_hashes",
             values=vals,
         )
-
-    def _get_event_signatures_txn(self, txn, event_id):
-        """Get all the signatures for a given PDU.
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-        Returns:
-            A dict of sig name -> dict(key_id -> signature_bytes)
-        """
-        query = (
-            "SELECT signature_name, key_id, signature"
-            " FROM event_signatures"
-            " WHERE event_id = ? "
-        )
-        txn.execute(query, (event_id, ))
-        rows = txn.fetchall()
-
-        res = {}
-
-        for name, key, sig in rows:
-            res.setdefault(name, {})[key] = sig
-
-        return res
-
-    def _store_event_signature_txn(self, txn, event_id, signature_name, key_id,
-                                   signature_bytes):
-        """Store a signature from the origin server for a PDU.
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-            origin (str): origin of the Event.
-            key_id (str): Id for the signing key.
-            signature (bytes): The signature.
-        """
-        self._simple_insert_txn(
-            txn,
-            "event_signatures",
-            {
-                "event_id": event_id,
-                "signature_name": signature_name,
-                "key_id": key_id,
-                "signature": buffer(signature_bytes),
-            },
-        )
-
-    def _get_prev_event_hashes_txn(self, txn, event_id):
-        """Get all the hashes for previous PDUs of a PDU
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-        Returns:
-            dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes.
-        """
-        query = (
-            "SELECT prev_event_id, algorithm, hash"
-            " FROM event_edge_hashes"
-            " WHERE event_id = ?"
-        )
-        txn.execute(query, (event_id, ))
-        results = {}
-        for prev_event_id, algorithm, hash_bytes in txn.fetchall():
-            hashes = results.setdefault(prev_event_id, {})
-            hashes[algorithm] = hash_bytes
-        return results
-
-    def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id,
-                                   algorithm, hash_bytes):
-        self._simple_insert_txn(
-            txn,
-            "event_edge_hashes",
-            {
-                "event_id": event_id,
-                "prev_event_id": prev_event_id,
-                "algorithm": algorithm,
-                "hash": buffer(hash_bytes),
-            },
-        )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 9630efcfcc..e935b9443b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -20,8 +20,6 @@ from synapse.util.caches.descriptors import (
 
 from twisted.internet import defer
 
-from synapse.util.stringutils import random_string
-
 import logging
 
 logger = logging.getLogger(__name__)
@@ -428,7 +426,3 @@ class StateStore(SQLBaseStore):
             }
 
         defer.returnValue(results)
-
-
-def _make_group_id(clock):
-    return str(int(clock.time_msec())) + random_string(5)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d7fe423f5a..3cab06fdef 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -159,9 +159,7 @@ class StreamStore(SQLBaseStore):
 
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
-                               limit=0, with_feedback=False):
-        # TODO (erikj): Handle compressed feedback
-
+                               limit=0):
         current_room_membership_sql = (
             "SELECT m.room_id FROM room_memberships as m "
             " INNER JOIN current_state_events as c"
@@ -227,10 +225,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1,
-                             with_feedback=False):
-        # TODO (erikj): Handle compressed feedback
-
+                             direction='b', limit=-1):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -302,7 +297,6 @@ class StreamStore(SQLBaseStore):
 
     @cachedInlineCallbacks(num_args=4)
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
-        # TODO (erikj): Handle compressed feedback
 
         end_token = RoomStreamToken.parse_stream_token(end_token)
 
@@ -379,6 +373,38 @@ class StreamStore(SQLBaseStore):
             )
             defer.returnValue("t%d-%d" % (topo, token))
 
+    def get_stream_token_for_event(self, event_id):
+        """The stream token for an event
+        Args:
+            event_id(str): The id of the event to look up a stream token for.
+        Raises:
+            StoreError if the event wasn't in the database.
+        Returns:
+            A deferred "s%d" stream token.
+        """
+        return self._simple_select_one_onecol(
+            table="events",
+            keyvalues={"event_id": event_id},
+            retcol="stream_ordering",
+        ).addCallback(lambda row: "s%d" % (row,))
+
+    def get_topological_token_for_event(self, event_id):
+        """The stream token for an event
+        Args:
+            event_id(str): The id of the event to look up a stream token for.
+        Raises:
+            StoreError if the event wasn't in the database.
+        Returns:
+            A deferred "t%d-%d" topological token.
+        """
+        return self._simple_select_one(
+            table="events",
+            keyvalues={"event_id": event_id},
+            retcols=("stream_ordering", "topological_ordering"),
+        ).addCallback(lambda row: "t%d-%d" % (
+            row["topological_ordering"], row["stream_ordering"],)
+        )
+
     def _get_max_topological_txn(self, txn):
         txn.execute(
             "SELECT MAX(topological_ordering) FROM events"