summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/appservice/api.py5
-rw-r--r--synapse/handlers/federation.py5
-rw-r--r--synapse/handlers/message.py29
-rw-r--r--synapse/handlers/register.py5
-rw-r--r--synapse/handlers/room.py15
-rw-r--r--synapse/handlers/room_member.py29
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/prepare_database.py12
-rw-r--r--synapse/storage/presence.py10
-rw-r--r--synapse/storage/roommember.py33
-rw-r--r--synapse/storage/stream.py90
-rw-r--r--synapse/util/__init__.py3
-rw-r--r--synapse/util/distributor.py22
-rw-r--r--synapse/util/ratelimitutils.py14
-rw-r--r--synapse/util/stringutils.py4
-rw-r--r--tests/storage/test_presence.py27
-rw-r--r--tests/storage/test_redaction.py51
-rw-r--r--tests/storage/test_roommember.py7
-rw-r--r--tests/storage/test_stream.py185
19 files changed, 28 insertions, 524 deletions
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index bc90605324..6da6a1b62e 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -100,11 +100,6 @@ class ApplicationServiceApi(SimpleHttpClient):
             logger.warning("push_bulk to %s threw exception %s", uri, ex)
         defer.returnValue(False)
 
-    @defer.inlineCallbacks
-    def push(self, service, event, txn_id=None):
-        response = yield self.push_bulk(service, [event], txn_id)
-        defer.returnValue(response)
-
     def _serialize(self, events):
         time_now = self.clock.time_msec()
         return [
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index eb02f0e000..c28226f840 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -40,6 +40,7 @@ from synapse.events.utils import prune_event
 from synapse.util.retryutils import NotRetryingDestination
 
 from synapse.push.action_generator import ActionGenerator
+from synapse.util.distributor import user_joined_room
 
 from twisted.internet import defer
 
@@ -49,10 +50,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def user_joined_room(distributor, user, room_id):
-    return distributor.fire("user_joined_room", user, room_id)
-
-
 class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index fa78d4acec..f51feda2f4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -45,35 +45,6 @@ class MessageHandler(BaseHandler):
         self.snapshot_cache = SnapshotCache()
 
     @defer.inlineCallbacks
-    def get_message(self, msg_id=None, room_id=None, sender_id=None,
-                    user_id=None):
-        """ Retrieve a message.
-
-        Args:
-            msg_id (str): The message ID to obtain.
-            room_id (str): The room where the message resides.
-            sender_id (str): The user ID of the user who sent the message.
-            user_id (str): The user ID of the user making this request.
-        Returns:
-            The message, or None if no message exists.
-        Raises:
-            SynapseError if something went wrong.
-        """
-        yield self.auth.check_joined_room(room_id, user_id)
-
-        # Pull out the message from the db
-#        msg = yield self.store.get_message(
-#            room_id=room_id,
-#            msg_id=msg_id,
-#            user_id=sender_id
-#        )
-
-        # TODO (erikj): Once we work out the correct c-s api we need to think
-        # on how to do this.
-
-        defer.returnValue(None)
-
-    @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
                      as_client_event=True):
         """Get messages in a room.
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f287ee247b..b0862067e1 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,6 +23,7 @@ from synapse.api.errors import (
 from ._base import BaseHandler
 from synapse.util.async import run_on_reactor
 from synapse.http.client import CaptchaServerHttpClient
+from synapse.util.distributor import registered_user
 
 import logging
 import urllib
@@ -30,10 +31,6 @@ import urllib
 logger = logging.getLogger(__name__)
 
 
-def registered_user(distributor, user):
-    return distributor.fire("registered_user", user)
-
-
 class RegistrationHandler(BaseHandler):
 
     def __init__(self, hs):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3e1d9282d7..ea306cd42a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,7 +25,6 @@ from synapse.api.constants import (
 from synapse.api.errors import AuthError, StoreError, SynapseError
 from synapse.util import stringutils
 from synapse.util.async import concurrently_execute
-from synapse.util.logcontext import preserve_context_over_fn
 from synapse.util.caches.response_cache import ResponseCache
 
 from collections import OrderedDict
@@ -39,20 +38,6 @@ logger = logging.getLogger(__name__)
 id_server_scheme = "https://"
 
 
-def user_left_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_left_room", user=user, room_id=room_id
-    )
-
-
-def user_joined_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_joined_room", user=user, room_id=room_id
-    )
-
-
 class RoomCreationHandler(BaseHandler):
 
     PRESETS_DICT = {
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b6ef3c91af..b69f36aefe 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -23,8 +23,8 @@ from synapse.api.constants import (
     EventTypes, Membership,
 )
 from synapse.api.errors import AuthError, SynapseError, Codes
-from synapse.util.logcontext import preserve_context_over_fn
 from synapse.util.async import Linearizer
+from synapse.util.distributor import user_left_room, user_joined_room
 
 from signedjson.sign import verify_signed_json
 from signedjson.key import decode_verify_key_bytes
@@ -38,20 +38,6 @@ logger = logging.getLogger(__name__)
 id_server_scheme = "https://"
 
 
-def user_left_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_left_room", user=user, room_id=room_id
-    )
-
-
-def user_joined_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_joined_room", user=user, room_id=room_id
-    )
-
-
 class RoomMemberHandler(BaseHandler):
     # TODO(paul): This handler currently contains a messy conflation of
     #   low-level API that works on UserID objects and so on, and REST-level
@@ -406,19 +392,6 @@ class RoomMemberHandler(BaseHandler):
             and guest_access.content["guest_access"] == "can_join"
         )
 
-    def _should_do_dance(self, current_state, inviter, room_hosts=None):
-        # TODO: Shouldn't this be remote_room_host?
-        room_hosts = room_hosts or []
-
-        is_host_in_room = self.is_host_in_room(current_state)
-        if is_host_in_room:
-            return False, room_hosts
-
-        if inviter and not self.hs.is_mine(inviter):
-            room_hosts.append(inviter.domain)
-
-        return True, room_hosts
-
     @defer.inlineCallbacks
     def lookup_room_alias(self, room_alias):
         """
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 04d7fcf6d6..1e27c2c0ce 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -810,12 +810,6 @@ class SQLBaseStore(object):
 
         return txn.execute(sql, keyvalues.values())
 
-    def get_next_stream_id(self):
-        with self._next_stream_id_lock:
-            i = self._next_stream_id
-            self._next_stream_id += 1
-            return i
-
     def _get_cache_dict(self, db_conn, table, entity_column, stream_column,
                         max_value):
         # Fetch a mapping of room_id -> max stream position for "recent" rooms.
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 00833422af..57f14fd12b 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -30,18 +30,6 @@ SCHEMA_VERSION = 31
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
 
-def read_schema(path):
-    """ Read the named database schema.
-
-    Args:
-        path: Path of the database schema.
-    Returns:
-        A string containing the database schema.
-    """
-    with open(path) as schema_file:
-        return schema_file.read()
-
-
 class PrepareDatabaseException(Exception):
     pass
 
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 59b4ef5ce6..07f5fae8dd 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -176,16 +176,6 @@ class PresenceStore(SQLBaseStore):
             desc="disallow_presence_visible",
         )
 
-    def is_presence_visible(self, observed_localpart, observer_userid):
-        return self._simple_select_one(
-            table="presence_allow_inbound",
-            keyvalues={"observed_user_id": observed_localpart,
-                       "observer_user_id": observer_userid},
-            retcols=["observed_user_id"],
-            allow_none=True,
-            desc="is_presence_visible",
-        )
-
     def add_presence_list_pending(self, observer_localpart, observed_userid):
         return self._simple_insert(
             table="presence_list",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 66e7a40e3c..77518e893f 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -121,26 +121,6 @@ class RoomMemberStore(SQLBaseStore):
         with self._stream_id_gen.get_next() as stream_ordering:
             yield self.runInteraction("locally_reject_invite", f, stream_ordering)
 
-    def get_room_member(self, user_id, room_id):
-        """Retrieve the current state of a room member.
-
-        Args:
-            user_id (str): The member's user ID.
-            room_id (str): The room the member is in.
-        Returns:
-            Deferred: Results in a MembershipEvent or None.
-        """
-        return self.runInteraction(
-            "get_room_member",
-            self._get_members_events_txn,
-            room_id,
-            user_id=user_id,
-        ).addCallback(
-            self._get_events
-        ).addCallback(
-            lambda events: events[0] if events else None
-        )
-
     @cached(max_entries=5000)
     def get_users_in_room(self, room_id):
         def f(txn):
@@ -203,19 +183,6 @@ class RoomMemberStore(SQLBaseStore):
                 defer.returnValue(invite)
         defer.returnValue(None)
 
-    def get_leave_and_ban_events_for_user(self, user_id):
-        """ Get all the leave events for a user
-        Args:
-            user_id (str): The user ID.
-        Returns:
-            A deferred list of event objects.
-        """
-        return self.get_rooms_for_user_where_membership_is(
-            user_id, (Membership.LEAVE, Membership.BAN)
-        ).addCallback(lambda leaves: self._get_events([
-            leave.event_id for leave in leaves
-        ]))
-
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
         matches one in the membership list.
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 76bcd9cd00..95b12559a6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -303,96 +303,6 @@ class StreamStore(SQLBaseStore):
 
         defer.returnValue(ret)
 
-    def get_room_events_stream(
-        self,
-        user_id,
-        from_key,
-        to_key,
-        limit=0,
-        is_guest=False,
-        room_ids=None
-    ):
-        room_ids = room_ids or []
-        room_ids = [r for r in room_ids]
-        if is_guest:
-            current_room_membership_sql = (
-                "SELECT c.room_id FROM history_visibility AS h"
-                " INNER JOIN current_state_events AS c"
-                " ON h.event_id = c.event_id"
-                " WHERE c.room_id IN (%s)"
-                " AND h.history_visibility = 'world_readable'" % (
-                    ",".join(map(lambda _: "?", room_ids))
-                )
-            )
-            current_room_membership_args = room_ids
-        else:
-            current_room_membership_sql = (
-                "SELECT m.room_id FROM room_memberships as m "
-                " INNER JOIN current_state_events as c"
-                " ON m.event_id = c.event_id AND c.state_key = m.user_id"
-                " WHERE m.user_id = ? AND m.membership = 'join'"
-            )
-            current_room_membership_args = [user_id]
-
-        # We also want to get any membership events about that user, e.g.
-        # invites or leave notifications.
-        membership_sql = (
-            "SELECT m.event_id FROM room_memberships as m "
-            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
-            "WHERE m.user_id = ? "
-        )
-        membership_args = [user_id]
-
-        if limit:
-            limit = max(limit, MAX_STREAM_SIZE)
-        else:
-            limit = MAX_STREAM_SIZE
-
-        # From and to keys should be integers from ordering.
-        from_id = RoomStreamToken.parse_stream_token(from_key)
-        to_id = RoomStreamToken.parse_stream_token(to_key)
-
-        if from_key == to_key:
-            return defer.succeed(([], to_key))
-
-        sql = (
-            "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE "
-            "(e.outlier = ? AND (room_id IN (%(current)s)) OR "
-            "(event_id IN (%(invites)s))) "
-            "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
-            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
-        ) % {
-            "current": current_room_membership_sql,
-            "invites": membership_sql,
-            "limit": limit
-        }
-
-        def f(txn):
-            args = ([False] + current_room_membership_args + membership_args +
-                    [from_id.stream, to_id.stream])
-            txn.execute(sql, args)
-
-            rows = self.cursor_to_dict(txn)
-
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
-
-            self._set_before_and_after(ret, rows)
-
-            if rows:
-                key = "s%d" % max(r["stream_ordering"] for r in rows)
-            else:
-                # Assume we didn't get anything because there was nothing to
-                # get.
-                key = to_key
-
-            return ret, key
-
-        return self.runInteraction("get_room_events_stream", f)
-
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
                              direction='b', limit=-1):
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 3b9da5b34a..b462495eb8 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -49,9 +49,6 @@ class Clock(object):
         l.start(msec / 1000.0, now=False)
         return l
 
-    def stop_looping_call(self, loop):
-        loop.stop()
-
     def call_later(self, delay, callback, *args, **kwargs):
         """Call something later
 
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 8875813de4..d7cccc06b1 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -15,7 +15,9 @@
 
 from twisted.internet import defer
 
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import (
+    PreserveLoggingContext, preserve_context_over_fn
+)
 
 from synapse.util import unwrapFirstError
 
@@ -25,6 +27,24 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+def registered_user(distributor, user):
+    return distributor.fire("registered_user", user)
+
+
+def user_left_room(distributor, user, room_id):
+    return preserve_context_over_fn(
+        distributor.fire,
+        "user_left_room", user=user, room_id=room_id
+    )
+
+
+def user_joined_room(distributor, user, room_id):
+    return preserve_context_over_fn(
+        distributor.fire,
+        "user_joined_room", user=user, room_id=room_id
+    )
+
+
 class Distributor(object):
     """A central dispatch point for loosely-connected pieces of code to
     register, observe, and fire signals.
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 4076eed269..1101881a2d 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -100,20 +100,6 @@ class _PerHostRatelimiter(object):
         self.current_processing = set()
         self.request_times = []
 
-    def is_empty(self):
-        time_now = self.clock.time_msec()
-        self.request_times[:] = [
-            r for r in self.request_times
-            if time_now - r < self.window_size
-        ]
-
-        return not (
-            self.ready_request_queue
-            or self.sleeping_requests
-            or self.current_processing
-            or self.request_times
-        )
-
     @contextlib.contextmanager
     def ratelimit(self):
         # `contextlib.contextmanager` takes a generator and turns it into a
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index b490bb8725..a100f151d4 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -21,10 +21,6 @@ _string_with_symbols = (
 )
 
 
-def origin_from_ucid(ucid):
-    return ucid.split("@", 1)[1]
-
-
 def random_string(length):
     return ''.join(random.choice(string.ascii_letters) for _ in xrange(length))
 
diff --git a/tests/storage/test_presence.py b/tests/storage/test_presence.py
index ec78f007ca..63203cea35 100644
--- a/tests/storage/test_presence.py
+++ b/tests/storage/test_presence.py
@@ -35,33 +35,6 @@ class PresenceStoreTestCase(unittest.TestCase):
         self.u_banana = UserID.from_string("@banana:test")
 
     @defer.inlineCallbacks
-    def test_visibility(self):
-        self.assertFalse((yield self.store.is_presence_visible(
-            observed_localpart=self.u_apple.localpart,
-            observer_userid=self.u_banana.to_string(),
-        )))
-
-        yield self.store.allow_presence_visible(
-            observed_localpart=self.u_apple.localpart,
-            observer_userid=self.u_banana.to_string(),
-        )
-
-        self.assertTrue((yield self.store.is_presence_visible(
-            observed_localpart=self.u_apple.localpart,
-            observer_userid=self.u_banana.to_string(),
-        )))
-
-        yield self.store.disallow_presence_visible(
-            observed_localpart=self.u_apple.localpart,
-            observer_userid=self.u_banana.to_string(),
-        )
-
-        self.assertFalse((yield self.store.is_presence_visible(
-            observed_localpart=self.u_apple.localpart,
-            observer_userid=self.u_banana.to_string(),
-        )))
-
-    @defer.inlineCallbacks
     def test_presence_list(self):
         self.assertEquals(
             [],
diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py
index 5880409867..6afaca3a61 100644
--- a/tests/storage/test_redaction.py
+++ b/tests/storage/test_redaction.py
@@ -110,22 +110,10 @@ class RedactionTestCase(unittest.TestCase):
             self.room1, self.u_alice, Membership.JOIN
         )
 
-        start = yield self.store.get_room_events_max_id()
-
         msg_event = yield self.inject_message(self.room1, self.u_alice, u"t")
 
-        end = yield self.store.get_room_events_max_id()
-
-        results, _ = yield self.store.get_room_events_stream(
-            self.u_alice.to_string(),
-            start,
-            end,
-        )
-
-        self.assertEqual(1, len(results))
-
         # Check event has not been redacted:
-        event = results[0]
+        event = yield self.store.get_event(msg_event.event_id)
 
         self.assertObjectHasAttributes(
             {
@@ -144,17 +132,7 @@ class RedactionTestCase(unittest.TestCase):
             self.room1, msg_event.event_id, self.u_alice, reason
         )
 
-        results, _ = yield self.store.get_room_events_stream(
-            self.u_alice.to_string(),
-            start,
-            end,
-        )
-
-        self.assertEqual(1, len(results))
-
-        # Check redaction
-
-        event = results[0]
+        event = yield self.store.get_event(msg_event.event_id)
 
         self.assertEqual(msg_event.event_id, event.event_id)
 
@@ -184,25 +162,12 @@ class RedactionTestCase(unittest.TestCase):
             self.room1, self.u_alice, Membership.JOIN
         )
 
-        start = yield self.store.get_room_events_max_id()
-
         msg_event = yield self.inject_room_member(
             self.room1, self.u_bob, Membership.JOIN,
             extra_content={"blue": "red"},
         )
 
-        end = yield self.store.get_room_events_max_id()
-
-        results, _ = yield self.store.get_room_events_stream(
-            self.u_alice.to_string(),
-            start,
-            end,
-        )
-
-        self.assertEqual(1, len(results))
-
-        # Check event has not been redacted:
-        event = results[0]
+        event = yield self.store.get_event(msg_event.event_id)
 
         self.assertObjectHasAttributes(
             {
@@ -221,17 +186,9 @@ class RedactionTestCase(unittest.TestCase):
             self.room1, msg_event.event_id, self.u_alice, reason
         )
 
-        results, _ = yield self.store.get_room_events_stream(
-            self.u_alice.to_string(),
-            start,
-            end,
-        )
-
-        self.assertEqual(1, len(results))
-
         # Check redaction
 
-        event = results[0]
+        event = yield self.store.get_event(msg_event.event_id)
 
         self.assertTrue("redacted_because" in event.unsigned)
 
diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py
index b029ff0584..997090fe35 100644
--- a/tests/storage/test_roommember.py
+++ b/tests/storage/test_roommember.py
@@ -71,13 +71,6 @@ class RoomMemberStoreTestCase(unittest.TestCase):
         yield self.inject_room_member(self.room, self.u_alice, Membership.JOIN)
 
         self.assertEquals(
-            Membership.JOIN,
-            (yield self.store.get_room_member(
-                user_id=self.u_alice.to_string(),
-                room_id=self.room.to_string(),
-            )).membership
-        )
-        self.assertEquals(
             [self.u_alice.to_string()],
             [m.user_id for m in (
                 yield self.store.get_room_members(self.room.to_string())
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
deleted file mode 100644
index da322152c7..0000000000
--- a/tests/storage/test_stream.py
+++ /dev/null
@@ -1,185 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from tests import unittest
-from twisted.internet import defer
-
-from synapse.api.constants import EventTypes, Membership
-from synapse.types import UserID, RoomID
-from tests.storage.event_injector import EventInjector
-
-from tests.utils import setup_test_homeserver
-
-from mock import Mock
-
-
-class StreamStoreTestCase(unittest.TestCase):
-
-    @defer.inlineCallbacks
-    def setUp(self):
-        hs = yield setup_test_homeserver(
-            resource_for_federation=Mock(),
-            http_client=None,
-        )
-
-        self.store = hs.get_datastore()
-        self.event_builder_factory = hs.get_event_builder_factory()
-        self.event_injector = EventInjector(hs)
-        self.handlers = hs.get_handlers()
-        self.message_handler = self.handlers.message_handler
-
-        self.u_alice = UserID.from_string("@alice:test")
-        self.u_bob = UserID.from_string("@bob:test")
-
-        self.room1 = RoomID.from_string("!abc123:test")
-        self.room2 = RoomID.from_string("!xyx987:test")
-
-    @defer.inlineCallbacks
-    def test_event_stream_get_other(self):
-        # Both bob and alice joins the room
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_alice, Membership.JOIN
-        )
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_bob, Membership.JOIN
-        )
-
-        # Initial stream key:
-        start = yield self.store.get_room_events_max_id()
-
-        yield self.event_injector.inject_message(self.room1, self.u_alice, u"test")
-
-        end = yield self.store.get_room_events_max_id()
-
-        results, _ = yield self.store.get_room_events_stream(
-            self.u_bob.to_string(),
-            start,
-            end,
-        )
-
-        self.assertEqual(1, len(results))
-
-        event = results[0]
-
-        self.assertObjectHasAttributes(
-            {
-                "type": EventTypes.Message,
-                "user_id": self.u_alice.to_string(),
-                "content": {"body": "test", "msgtype": "message"},
-            },
-            event,
-        )
-
-    @defer.inlineCallbacks
-    def test_event_stream_get_own(self):
-        # Both bob and alice joins the room
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_alice, Membership.JOIN
-        )
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_bob, Membership.JOIN
-        )
-
-        # Initial stream key:
-        start = yield self.store.get_room_events_max_id()
-
-        yield self.event_injector.inject_message(self.room1, self.u_alice, u"test")
-
-        end = yield self.store.get_room_events_max_id()
-
-        results, _ = yield self.store.get_room_events_stream(
-            self.u_alice.to_string(),
-            start,
-            end,
-        )
-
-        self.assertEqual(1, len(results))
-
-        event = results[0]
-
-        self.assertObjectHasAttributes(
-            {
-                "type": EventTypes.Message,
-                "user_id": self.u_alice.to_string(),
-                "content": {"body": "test", "msgtype": "message"},
-            },
-            event,
-        )
-
-    @defer.inlineCallbacks
-    def test_event_stream_join_leave(self):
-        # Both bob and alice joins the room
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_alice, Membership.JOIN
-        )
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_bob, Membership.JOIN
-        )
-
-        # Then bob leaves again.
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_bob, Membership.LEAVE
-        )
-
-        # Initial stream key:
-        start = yield self.store.get_room_events_max_id()
-
-        yield self.event_injector.inject_message(self.room1, self.u_alice, u"test")
-
-        end = yield self.store.get_room_events_max_id()
-
-        results, _ = yield self.store.get_room_events_stream(
-            self.u_bob.to_string(),
-            start,
-            end,
-        )
-
-        # We should not get the message, as it happened *after* bob left.
-        self.assertEqual(0, len(results))
-
-    @defer.inlineCallbacks
-    def test_event_stream_prev_content(self):
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_bob, Membership.JOIN
-        )
-
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_alice, Membership.JOIN
-        )
-
-        start = yield self.store.get_room_events_max_id()
-
-        yield self.event_injector.inject_room_member(
-            self.room1, self.u_alice, Membership.JOIN,
-        )
-
-        end = yield self.store.get_room_events_max_id()
-
-        results, _ = yield self.store.get_room_events_stream(
-            self.u_bob.to_string(),
-            start,
-            end,
-        )
-
-        # We should not get the message, as it happened *after* bob left.
-        self.assertEqual(1, len(results))
-
-        event = results[0]
-
-        self.assertTrue(
-            "prev_content" in event.unsigned,
-            msg="No prev_content key"
-        )