diff options
author | Richard van der Hoff <richard@matrix.org> | 2018-02-03 22:40:28 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2018-02-03 22:40:28 +0000 |
commit | bb9f0f3cdb57882248b6426dd4ec5bd5781daaef (patch) | |
tree | 41a8ee0b215406119ec5ef44cf0a3326a8ae1636 /tests/util | |
parent | oops (diff) | |
parent | Merge pull request #2837 from matrix-org/rav/fix_quarantine_media (diff) | |
download | synapse-bb9f0f3cdb57882248b6426dd4ec5bd5781daaef.tar.xz |
Merge branch 'develop' into matthew/gin_work_mem
Diffstat (limited to '')
-rw-r--r-- | tests/util/test_file_consumer.py | 176 | ||||
-rw-r--r-- | tests/util/test_logcontext.py | 16 | ||||
-rw-r--r-- | tests/utils.py | 249 |
3 files changed, 247 insertions, 194 deletions
diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py new file mode 100644 index 0000000000..76e2234255 --- /dev/null +++ b/tests/util/test_file_consumer.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer, reactor +from mock import NonCallableMock + +from synapse.util.file_consumer import BackgroundFileConsumer + +from tests import unittest +from StringIO import StringIO + +import threading + + +class FileConsumerTests(unittest.TestCase): + + @defer.inlineCallbacks + def test_pull_consumer(self): + string_file = StringIO() + consumer = BackgroundFileConsumer(string_file) + + try: + producer = DummyPullProducer() + + yield producer.register_with_consumer(consumer) + + yield producer.write_and_wait("Foo") + + self.assertEqual(string_file.getvalue(), "Foo") + + yield producer.write_and_wait("Bar") + + self.assertEqual(string_file.getvalue(), "FooBar") + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + @defer.inlineCallbacks + def test_push_consumer(self): + string_file = BlockingStringWrite() + consumer = BackgroundFileConsumer(string_file) + + try: + producer = NonCallableMock(spec_set=[]) + + consumer.registerProducer(producer, True) + + consumer.write("Foo") + yield string_file.wait_for_n_writes(1) + + self.assertEqual(string_file.buffer, "Foo") + + consumer.write("Bar") + yield string_file.wait_for_n_writes(2) + + self.assertEqual(string_file.buffer, "FooBar") + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + @defer.inlineCallbacks + def test_push_producer_feedback(self): + string_file = BlockingStringWrite() + consumer = BackgroundFileConsumer(string_file) + + try: + producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"]) + + resume_deferred = defer.Deferred() + producer.resumeProducing.side_effect = lambda: resume_deferred.callback(None) + + consumer.registerProducer(producer, True) + + number_writes = 0 + with string_file.write_lock: + for _ in range(consumer._PAUSE_ON_QUEUE_SIZE): + consumer.write("Foo") + number_writes += 1 + + producer.pauseProducing.assert_called_once() + + yield string_file.wait_for_n_writes(number_writes) + + yield resume_deferred + producer.resumeProducing.assert_called_once() + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + +class DummyPullProducer(object): + def __init__(self): + self.consumer = None + self.deferred = defer.Deferred() + + def resumeProducing(self): + d = self.deferred + self.deferred = defer.Deferred() + d.callback(None) + + def write_and_wait(self, bytes): + d = self.deferred + self.consumer.write(bytes) + return d + + def register_with_consumer(self, consumer): + d = self.deferred + self.consumer = consumer + self.consumer.registerProducer(self, False) + return d + + +class BlockingStringWrite(object): + def __init__(self): + self.buffer = "" + self.closed = False + self.write_lock = threading.Lock() + + self._notify_write_deferred = None + self._number_of_writes = 0 + + def write(self, bytes): + with self.write_lock: + self.buffer += bytes + self._number_of_writes += 1 + + reactor.callFromThread(self._notify_write) + + def close(self): + self.closed = True + + def _notify_write(self): + "Called by write to indicate a write happened" + with self.write_lock: + if not self._notify_write_deferred: + return + d = self._notify_write_deferred + self._notify_write_deferred = None + d.callback(None) + + @defer.inlineCallbacks + def wait_for_n_writes(self, n): + "Wait for n writes to have happened" + while True: + with self.write_lock: + if n <= self._number_of_writes: + return + + if not self._notify_write_deferred: + self._notify_write_deferred = defer.Deferred() + + d = self._notify_write_deferred + + yield d diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index e2f7765f49..4850722bc5 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -12,12 +12,12 @@ class LoggingContextTestCase(unittest.TestCase): def _check_test_key(self, value): self.assertEquals( - LoggingContext.current_context().test_key, value + LoggingContext.current_context().request, value ) def test_with_context(self): with LoggingContext() as context_one: - context_one.test_key = "test" + context_one.request = "test" self._check_test_key("test") @defer.inlineCallbacks @@ -25,14 +25,14 @@ class LoggingContextTestCase(unittest.TestCase): @defer.inlineCallbacks def competing_callback(): with LoggingContext() as competing_context: - competing_context.test_key = "competing" + competing_context.request = "competing" yield sleep(0) self._check_test_key("competing") reactor.callLater(0, competing_callback) with LoggingContext() as context_one: - context_one.test_key = "one" + context_one.request = "one" yield sleep(0) self._check_test_key("one") @@ -43,14 +43,14 @@ class LoggingContextTestCase(unittest.TestCase): @defer.inlineCallbacks def cb(): - context_one.test_key = "one" + context_one.request = "one" yield function() self._check_test_key("one") callback_completed[0] = True with LoggingContext() as context_one: - context_one.test_key = "one" + context_one.request = "one" # fire off function, but don't wait on it. logcontext.preserve_fn(cb)() @@ -107,7 +107,7 @@ class LoggingContextTestCase(unittest.TestCase): sentinel_context = LoggingContext.current_context() with LoggingContext() as context_one: - context_one.test_key = "one" + context_one.request = "one" d1 = logcontext.make_deferred_yieldable(blocking_function()) # make sure that the context was reset by make_deferred_yieldable @@ -124,7 +124,7 @@ class LoggingContextTestCase(unittest.TestCase): argument isn't actually a deferred""" with LoggingContext() as context_one: - context_one.test_key = "one" + context_one.request = "one" d1 = logcontext.make_deferred_yieldable("bum") self._check_test_key("one") diff --git a/tests/utils.py b/tests/utils.py index ed8a7360f5..8efd3a3475 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -13,27 +13,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.http.server import HttpServer -from synapse.api.errors import cs_error, CodeMessageException, StoreError -from synapse.api.constants import EventTypes -from synapse.storage.prepare_database import prepare_database -from synapse.storage.engines import create_engine -from synapse.server import HomeServer -from synapse.federation.transport import server -from synapse.util.ratelimitutils import FederationRateLimiter - -from synapse.util.logcontext import LoggingContext - -from twisted.internet import defer, reactor -from twisted.enterprise.adbapi import ConnectionPool - -from collections import namedtuple -from mock import patch, Mock import hashlib +from inspect import getcallargs import urllib import urlparse -from inspect import getcallargs +from mock import Mock, patch +from twisted.internet import defer, reactor + +from synapse.api.errors import CodeMessageException, cs_error +from synapse.federation.transport import server +from synapse.http.server import HttpServer +from synapse.server import HomeServer +from synapse.storage import PostgresEngine +from synapse.storage.engines import create_engine +from synapse.storage.prepare_database import prepare_database +from synapse.util.logcontext import LoggingContext +from synapse.util.ratelimitutils import FederationRateLimiter + +# set this to True to run the tests against postgres instead of sqlite. +# It requires you to have a local postgres database called synapse_test, within +# which ALL TABLES WILL BE DROPPED +USE_POSTGRES_FOR_TESTS = False @defer.inlineCallbacks @@ -57,32 +58,70 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.worker_app = None config.email_enable_notifs = False config.block_non_admin_invites = False + config.federation_domain_whitelist = None + config.user_directory_search_all_users = False + + # disable user directory updates, because they get done in the + # background, which upsets the test runner. + config.update_user_directory = False config.use_frozen_dicts = True - config.database_config = {"name": "sqlite3"} config.ldap_enabled = False if "clock" not in kargs: kargs["clock"] = MockClock() + if USE_POSTGRES_FOR_TESTS: + config.database_config = { + "name": "psycopg2", + "args": { + "database": "synapse_test", + "cp_min": 1, + "cp_max": 5, + }, + } + else: + config.database_config = { + "name": "sqlite3", + "args": { + "database": ":memory:", + "cp_min": 1, + "cp_max": 1, + }, + } + + db_engine = create_engine(config.database_config) + + # we need to configure the connection pool to run the on_new_connection + # function, so that we can test code that uses custom sqlite functions + # (like rank). + config.database_config["args"]["cp_openfun"] = db_engine.on_new_connection + if datastore is None: - db_pool = SQLiteMemoryDbPool() - yield db_pool.prepare() hs = HomeServer( - name, db_pool=db_pool, config=config, + name, config=config, + db_config=config.database_config, version_string="Synapse/tests", - database_engine=create_engine(config.database_config), - get_db_conn=db_pool.get_db_conn, + database_engine=db_engine, room_list_handler=object(), tls_server_context_factory=Mock(), **kargs ) + db_conn = hs.get_db_conn() + # make sure that the database is empty + if isinstance(db_engine, PostgresEngine): + cur = db_conn.cursor() + cur.execute("SELECT tablename FROM pg_tables where schemaname='public'") + rows = cur.fetchall() + for r in rows: + cur.execute("DROP TABLE %s CASCADE" % r[0]) + yield prepare_database(db_conn, db_engine, config) hs.setup() else: hs = HomeServer( name, db_pool=None, datastore=datastore, config=config, version_string="Synapse/tests", - database_engine=create_engine(config.database_config), + database_engine=db_engine, room_list_handler=object(), tls_server_context_factory=Mock(), **kargs @@ -301,168 +340,6 @@ class MockClock(object): return d -class SQLiteMemoryDbPool(ConnectionPool, object): - def __init__(self): - super(SQLiteMemoryDbPool, self).__init__( - "sqlite3", ":memory:", - cp_min=1, - cp_max=1, - ) - - self.config = Mock() - self.config.password_providers = [] - self.config.database_config = {"name": "sqlite3"} - - def prepare(self): - engine = self.create_engine() - return self.runWithConnection( - lambda conn: prepare_database(conn, engine, self.config) - ) - - def get_db_conn(self): - conn = self.connect() - engine = self.create_engine() - prepare_database(conn, engine, self.config) - return conn - - def create_engine(self): - return create_engine(self.config.database_config) - - -class MemoryDataStore(object): - - Room = namedtuple( - "Room", - ["room_id", "is_public", "creator"] - ) - - def __init__(self): - self.tokens_to_users = {} - self.paths_to_content = {} - - self.members = {} - self.rooms = {} - - self.current_state = {} - self.events = [] - - class Snapshot(namedtuple("Snapshot", "room_id user_id membership_state")): - def fill_out_prev_events(self, event): - pass - - def snapshot_room(self, room_id, user_id, state_type=None, state_key=None): - return self.Snapshot( - room_id, user_id, self.get_room_member(user_id, room_id) - ) - - def register(self, user_id, token, password_hash): - if user_id in self.tokens_to_users.values(): - raise StoreError(400, "User in use.") - self.tokens_to_users[token] = user_id - - def get_user_by_access_token(self, token): - try: - return { - "name": self.tokens_to_users[token], - } - except Exception: - raise StoreError(400, "User does not exist.") - - def get_room(self, room_id): - try: - return self.rooms[room_id] - except Exception: - return None - - def store_room(self, room_id, room_creator_user_id, is_public): - if room_id in self.rooms: - raise StoreError(409, "Conflicting room!") - - room = MemoryDataStore.Room( - room_id=room_id, - is_public=is_public, - creator=room_creator_user_id - ) - self.rooms[room_id] = room - - def get_room_member(self, user_id, room_id): - return self.members.get(room_id, {}).get(user_id) - - def get_room_members(self, room_id, membership=None): - if membership: - return [ - v for k, v in self.members.get(room_id, {}).items() - if v.membership == membership - ] - else: - return self.members.get(room_id, {}).values() - - def get_rooms_for_user_where_membership_is(self, user_id, membership_list): - return [ - m[user_id] for m in self.members.values() - if user_id in m and m[user_id].membership in membership_list - ] - - def get_room_events_stream(self, user_id=None, from_key=None, to_key=None, - limit=0, with_feedback=False): - return ([], from_key) # TODO - - def get_joined_hosts_for_room(self, room_id): - return defer.succeed([]) - - def persist_event(self, event): - if event.type == EventTypes.Member: - room_id = event.room_id - user = event.state_key - self.members.setdefault(room_id, {})[user] = event - - if hasattr(event, "state_key"): - key = (event.room_id, event.type, event.state_key) - self.current_state[key] = event - - self.events.append(event) - - def get_current_state(self, room_id, event_type=None, state_key=""): - if event_type: - key = (room_id, event_type, state_key) - if self.current_state.get(key): - return [self.current_state.get(key)] - return None - else: - return [ - e for e in self.current_state - if e[0] == room_id - ] - - def set_presence_state(self, user_localpart, state): - return defer.succeed({"state": 0}) - - def get_presence_list(self, user_localpart, accepted): - return [] - - def get_room_events_max_id(self): - return "s0" # TODO (erikj) - - def get_send_event_level(self, room_id): - return defer.succeed(0) - - def get_power_level(self, room_id, user_id): - return defer.succeed(0) - - def get_add_state_level(self, room_id): - return defer.succeed(0) - - def get_room_join_rule(self, room_id): - # TODO (erikj): This should be configurable - return defer.succeed("invite") - - def get_ops_levels(self, room_id): - return defer.succeed((5, 5, 5)) - - def insert_client_ip(self, user, access_token, ip, user_agent): - return defer.succeed(None) - - def _format_call(args, kwargs): return ", ".join( ["%r" % (a) for a in args] + |