From 63878c03794d33a8767425e114845159e5c1cb9a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 13:42:21 +0100 Subject: Don't bother checking for updates if the stream token hasn't advanced for a user --- tests/handlers/test_federation.py | 4 ++-- tests/handlers/test_room.py | 8 +++++--- tests/handlers/test_typing.py | 12 ++++++------ tests/rest/client/v1/test_presence.py | 15 ++++++++++----- tests/utils.py | 2 +- 5 files changed, 24 insertions(+), 17 deletions(-) (limited to 'tests') diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 08d2404b6c..f3821242bc 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -83,7 +83,7 @@ class FederationTestCase(unittest.TestCase): "hashes": {"sha256":"AcLrgtUIqqwaGoHhrEvYG1YLDIsVPYJdSRGhkp3jJp8"}, }) - self.datastore.persist_event.return_value = defer.succeed(None) + self.datastore.persist_event.return_value = defer.succeed((1,1)) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) @@ -126,5 +126,5 @@ class FederationTestCase(unittest.TestCase): self.auth.check.assert_called_once_with(ANY, auth_events={}) self.notifier.on_new_room_event.assert_called_once_with( - ANY, extra_users=[] + ANY, 1, 1, extra_users=[] ) diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 6417f73309..a2d7635995 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -87,6 +87,8 @@ class RoomMemberHandlerTestCase(unittest.TestCase): self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) + self.datastore.persist_event.return_value = (1,1) + @defer.inlineCallbacks def test_invite(self): room_id = "!foo:red" @@ -160,7 +162,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context, ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[UserID.from_string(target_user_id)] + event, 1, 1, extra_users=[UserID.from_string(target_user_id)] ) self.assertFalse(self.datastore.get_room.called) self.assertFalse(self.datastore.store_room.called) @@ -226,7 +228,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[user] + event, 1, 1, extra_users=[user] ) join_signal_observer.assert_called_with( @@ -304,7 +306,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[user] + event, 1, 1, extra_users=[user] ) leave_signal_observer.assert_called_with( diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index b318d4944a..7ccbe2ea9c 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -183,7 +183,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 1) @@ -246,7 +246,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 1) @@ -300,7 +300,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) yield put_json.await_calls() @@ -332,7 +332,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.on_new_user_event.reset_mock() @@ -352,7 +352,7 @@ class TypingNotificationsTestCase(unittest.TestCase): self.clock.advance_time(11) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 2, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 2) @@ -378,7 +378,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 3, rooms=[self.room_id]), ]) self.on_new_user_event.reset_mock() diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 8e0c5fa630..c0c52796ad 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -27,6 +27,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.rest.client.v1 import presence from synapse.rest.client.v1 import events from synapse.types import UserID +from synapse.util.async import run_on_reactor OFFLINE = PresenceState.OFFLINE @@ -264,6 +265,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): datastore=Mock(spec=[ "set_presence_state", "get_presence_list", + "get_rooms_for_user", ]), clock=Mock(spec=[ "call_later", @@ -298,6 +300,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.get_app_service_by_user_id = Mock( return_value=defer.succeed(None) ) + self.mock_datastore.get_rooms_for_user = ( + lambda u: get_rooms_for_user(UserID.from_string(u)) + ) def get_profile_displayname(user_id): return defer.succeed("Frank") @@ -350,19 +355,19 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.set_presence_state.return_value = defer.succeed( {"state": ONLINE} ) - self.mock_datastore.get_presence_list.return_value = defer.succeed( - [] - ) + self.mock_datastore.get_presence_list.return_value = defer.succeed([]) yield self.presence.set_state(self.u_banana, self.u_banana, state={"presence": ONLINE} ) + yield run_on_reactor() + (code, response) = yield self.mock_resource.trigger("GET", - "/events?from=0_1_0&timeout=0", None) + "/events?from=s0_1_0&timeout=0", None) self.assertEquals(200, code) - self.assertEquals({"start": "0_1_0", "end": "0_2_0", "chunk": [ + self.assertEquals({"start": "s0_1_0", "end": "s0_2_0", "chunk": [ {"type": "m.presence", "content": { "user_id": "@banana:test", diff --git a/tests/utils.py b/tests/utils.py index cc038fecf1..a67530bd63 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -355,7 +355,7 @@ class MemoryDataStore(object): return [] def get_room_events_max_id(self): - return 0 # TODO (erikj) + return "s0" # TODO (erikj) def get_send_event_level(self, room_id): return defer.succeed(0) -- cgit 1.5.1 From f1b83d88a3d3ad596631e51852a9802d0a7270a0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 16:54:02 +0100 Subject: Discard unused NotifierUserStreams --- synapse/notifier.py | 50 ++++++++++++++++++++++++----------- tests/rest/client/v1/test_presence.py | 1 + tests/utils.py | 3 +++ 3 files changed, 38 insertions(+), 16 deletions(-) (limited to 'tests') diff --git a/synapse/notifier.py b/synapse/notifier.py index 6fcb7767a0..344dd03172 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -71,14 +71,17 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, current_token, appservice=None): + def __init__(self, user, rooms, current_token, time_now_ms, + appservice=None): self.user = str(user) self.appservice = appservice self.listeners = set() self.rooms = set(rooms) self.current_token = current_token + self.last_notified_ms = time_now_ms - def notify(self, stream_key, stream_id): + def notify(self, stream_key, stream_id, time_now_ms): + self.last_notified_ms = time_now_ms self.current_token = self.current_token.copy_and_replace( stream_key, stream_id ) @@ -96,7 +99,7 @@ class _NotifierUserStream(object): lst = notifier.room_to_user_streams.get(room, set()) lst.discard(self) - notifier.user_to_user_streams.get(self.user, set()).discard(self) + notifier.user_to_user_stream.pop(self.user) if self.appservice: notifier.appservice_to_user_streams.get( @@ -111,6 +114,8 @@ class Notifier(object): Primarily used from the /events stream. """ + UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 + def __init__(self, hs): self.hs = hs @@ -128,6 +133,10 @@ class Notifier(object): "user_joined_room", self._user_joined_room ) + self.clock.looping_call( + self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS + ) + # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. @@ -221,9 +230,12 @@ class Notifier(object): logger.debug("on_new_room_event listeners %s", user_streams) + time_now_ms = self.clock.time_msec() for user_stream in user_streams: try: - user_stream.notify("room_key", "s%d" % (room_stream_id,)) + user_stream.notify( + "room_key", "s%d" % (room_stream_id,), time_now_ms + ) except: logger.exception("Failed to notify listener") @@ -246,9 +258,10 @@ class Notifier(object): for room in rooms: user_streams |= self.room_to_user_streams.get(room, set()) + time_now_ms = self.clock.time_msec() for user_stream in user_streams: try: - user_stream.notify(stream_key, new_token) + user_stream.notify(stream_key, new_token, time_now_ms) except: logger.exception("Failed to notify listener") @@ -260,6 +273,7 @@ class Notifier(object): """ deferred = defer.Deferred() + time_now_ms = self.clock.time_msec() user = str(user) user_stream = self.user_to_user_stream.get(user) @@ -272,6 +286,7 @@ class Notifier(object): rooms=rooms, appservice=appservice, current_token=current_token, + time_now_ms=time_now_ms, ) self._register_with_keys(user_stream) else: @@ -365,6 +380,20 @@ class Notifier(object): defer.returnValue(result) + @log_function + def remove_expired_streams(self): + time_now_ms = self.clock.time_msec() + expired_streams = [] + expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS + for stream in self.user_to_user_stream.values(): + if stream.listeners: + continue + if stream.last_notified_ms < expire_before_ts: + expired_streams.append(stream) + + for expired_stream in expired_streams: + expired_stream.remove(self) + @log_function def _register_with_keys(self, user_stream): self.user_to_user_stream[user_stream.user] = user_stream @@ -385,14 +414,3 @@ class Notifier(object): room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) new_user_stream.rooms.add(room_id) - - -def _discard_if_notified(listener_set): - """Remove any 'stale' listeners from the given set. - """ - to_discard = set() - for l in listener_set: - if l.notified(): - to_discard.add(l) - - listener_set -= to_discard diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index c0c52796ad..29c0038f06 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -271,6 +271,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): "call_later", "cancel_call_later", "time_msec", + "looping_call", ]), ) diff --git a/tests/utils.py b/tests/utils.py index a67530bd63..3b5c335911 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -197,6 +197,9 @@ class MockClock(object): return t + def looping_call(self, function, interval): + pass + def cancel_call_later(self, timer): if timer[2]: raise Exception("Cannot cancel an expired timer") -- cgit 1.5.1 From c5d1b4986bbb5983054b64fdc3dd3c32e80e3c17 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 14:59:31 +0100 Subject: Remove unused arguments and doc PresenceHandler.push_update_to_clients --- synapse/handlers/presence.py | 20 ++++++++----------- tests/handlers/test_presence.py | 22 +++++---------------- tests/handlers/test_presencelike.py | 39 ++++++++----------------------------- 3 files changed, 21 insertions(+), 60 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1edab05492..0c246958ac 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -496,9 +496,7 @@ class PresenceHandler(BaseHandler): # We want to tell the person that just came online # presence state of people they are interested in? self.push_update_to_clients( - observed_user=target_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(target_user), ) deferreds = [] @@ -712,10 +710,7 @@ class PresenceHandler(BaseHandler): continue self.push_update_to_clients( - observed_user=user, - users_to_push=observers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=observers, room_ids=room_ids ) user_id = user.to_string() @@ -779,10 +774,7 @@ class PresenceHandler(BaseHandler): localusers = set(localusers) self.push_update_to_clients( - observed_user=observed_user, - users_to_push=localusers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=localusers, room_ids=room_ids ) remote_domains = set(remote_domains) @@ -807,8 +799,12 @@ class PresenceHandler(BaseHandler): defer.returnValue((localusers, remote_domains)) - def push_update_to_clients(self, observed_user, users_to_push=[], - room_ids=[], statuscache=None): + def push_update_to_clients(self, users_to_push=[], room_ids=[]): + """Notify clients of a new presence event. + Args: + users_to_push(list): List of users to notify. + room_ids(list): List of room_ids to notify. + """ with PreserveLoggingContext(): self.notifier.on_new_user_event( users_to_push, diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 70147b017e..ee773797e7 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -1097,12 +1097,8 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase): # apple should see both banana and clementine currently offline self.mock_update_client.assert_has_calls([ - call(users_to_push=[self.u_apple], - observed_user=self.u_banana, - statuscache=ANY), - call(users_to_push=[self.u_apple], - observed_user=self.u_clementine, - statuscache=ANY), + call(users_to_push=[self.u_apple]), + call(users_to_push=[self.u_apple]), ], any_order=True) # Gut-wrenching tests @@ -1121,13 +1117,8 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase): # apple and banana should now both see each other online self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_apple]), - observed_user=self.u_banana, - room_ids=[], - statuscache=ANY), - call(users_to_push=[self.u_banana], - observed_user=self.u_apple, - statuscache=ANY), + call(users_to_push=set([self.u_apple]), room_ids=[]), + call(users_to_push=[self.u_banana]), ], any_order=True) self.assertTrue("apple" in self.handler._local_pushmap) @@ -1143,10 +1134,7 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase): # banana should now be told apple is offline self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_banana, self.u_apple]), - observed_user=self.u_apple, - room_ids=[], - statuscache=ANY), + call(users_to_push=set([self.u_banana, self.u_apple]), room_ids=[]), ], any_order=True) self.assertFalse("banana" in self.handler._local_pushmap) diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py index 977e832da7..1f2e66ac11 100644 --- a/tests/handlers/test_presencelike.py +++ b/tests/handlers/test_presencelike.py @@ -209,20 +209,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): ], presence) self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]), - room_ids=[], - observed_user=self.u_apple, - statuscache=ANY), # self-reflection + call( + users_to_push={self.u_apple, self.u_banana, self.u_clementine}, + room_ids=[] + ), ], any_order=True) - statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({ - "presence": ONLINE, - "last_active": 1000000, # MockClock - "displayname": "Frank", - "avatar_url": "http://foo", - }, statuscache.state) - self.mock_update_client.reset_mock() self.datastore.set_profile_displayname.return_value = defer.succeed( @@ -232,21 +224,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.u_apple, "I am an Apple") self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]), + call( + users_to_push={self.u_apple, self.u_banana, self.u_clementine}, room_ids=[], - observed_user=self.u_apple, - statuscache=ANY), # self-reflection + ), ], any_order=True) - statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({ - "presence": ONLINE, - "last_active": 1000000, # MockClock - "displayname": "I am an Apple", - "avatar_url": "http://foo", - }, statuscache.state) - - @defer.inlineCallbacks def test_push_remote(self): self.presence_list = [ @@ -314,13 +297,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.mock_update_client.assert_called_with( users_to_push=set([self.u_apple]), room_ids=[], - observed_user=self.u_potato, - statuscache=ANY) - - statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({"presence": ONLINE, - "displayname": "Frank", - "avatar_url": "http://foo"}, statuscache.state) + ) state = yield self.handlers.presence_handler.get_state(self.u_potato, self.u_apple) -- cgit 1.5.1 From a2c4f3f150f63c720370f6882da804c8ac20fd69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 10:54:04 +0100 Subject: Fix daedlock --- synapse/federation/federation_client.py | 15 +++- synapse/federation/federation_server.py | 2 + synapse/handlers/message.py | 33 ++++++--- synapse/storage/_base.py | 26 +++---- synapse/storage/events.py | 125 +++++++++++++++++++------------- synapse/storage/stream.py | 2 + tests/storage/test_base.py | 3 +- 7 files changed, 122 insertions(+), 84 deletions(-) (limited to 'tests') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 904c7c0945..c255df1bbb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -222,7 +222,7 @@ class FederationClient(FederationBase): for p in transaction_data["pdus"] ] - if pdu_list: + if pdu_list and pdu_list[0]: pdu = pdu_list[0] # Check signatures are correct. @@ -255,7 +255,7 @@ class FederationClient(FederationBase): ) continue - if self._get_pdu_cache is not None: + if self._get_pdu_cache is not None and pdu: self._get_pdu_cache[event_id] = pdu defer.returnValue(pdu) @@ -475,6 +475,9 @@ class FederationClient(FederationBase): limit (int): Maximum number of events to return. min_depth (int): Minimum depth of events tor return. """ + logger.debug("get_missing_events: latest_events: %r", latest_events) + logger.debug("get_missing_events: earliest_events_ids: %r", earliest_events_ids) + try: content = yield self.transport_layer.get_missing_events( destination=destination, @@ -485,6 +488,8 @@ class FederationClient(FederationBase): min_depth=min_depth, ) + logger.debug("get_missing_events: Got content: %r", content) + events = [ self.event_from_pdu_json(e) for e in content.get("events", []) @@ -494,6 +499,8 @@ class FederationClient(FederationBase): destination, events, outlier=False ) + logger.debug("get_missing_events: signed_events: %r", signed_events) + have_gotten_all_from_destination = True except HttpResponseException as e: if not e.code == 400: @@ -518,6 +525,8 @@ class FederationClient(FederationBase): # Are we missing any? seen_events = set(earliest_events_ids) + + logger.debug("get_missing_events: signed_events2: %r", signed_events) seen_events.update(e.event_id for e in signed_events) missing_events = {} @@ -561,7 +570,7 @@ class FederationClient(FederationBase): res = yield defer.DeferredList(deferreds, consumeErrors=True) for (result, val), (e_id, _) in zip(res, ordered_missing): - if result: + if result and val: signed_events.append(val) else: failed_to_fetch.add(e_id) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index cd79e23f4b..2c6488dd1b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -415,6 +415,8 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: + logger.debug("We're missing: %r", prevs-seen) + latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 867fdbefb0..6a1b25d112 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -303,18 +303,27 @@ class MessageHandler(BaseHandler): if event.membership != Membership.JOIN: return try: - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=now_token.room_key, - ), - self.state_handler.get_current_state( - event.room_id - ), - ] - ).addErrback(unwrapFirstError) + # (messages, token), current_state = yield defer.gatherResults( + # [ + # self.store.get_recent_events_for_room( + # event.room_id, + # limit=limit, + # end_token=now_token.room_key, + # ), + # self.state_handler.get_current_state( + # event.room_id + # ), + # ] + # ).addErrback(unwrapFirstError) + + messages, token = yield self.store.get_recent_events_for_room( + event.room_id, + limit=limit, + end_token=now_token.room_key, + ) + current_state = yield self.state_handler.get_current_state( + event.room_id + ) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ceff99c16d..0df1b46edc 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,10 +301,12 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._event_fetch_lock = threading.Condition() + self._event_fetch_lock = threading.Lock() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + self._pending_ds = [] + self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator() @@ -344,8 +346,7 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - @contextlib.contextmanager - def _new_transaction(self, conn, desc, after_callbacks): + def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -366,6 +367,9 @@ class SQLBaseStore(object): txn = LoggingTransaction( txn, name, self.database_engine, after_callbacks ) + r = func(txn, *args, **kwargs) + conn.commit() + return r except self.database_engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -398,17 +402,6 @@ class SQLBaseStore(object): ) continue raise - - try: - yield txn - conn.commit() - return - except: - try: - conn.rollback() - except: - pass - raise except Exception as e: logger.debug("[TXN FAIL] {%s} %s", name, e) raise @@ -440,8 +433,9 @@ class SQLBaseStore(object): conn.reconnect() current_context.copy_to(context) - with self._new_transaction(conn, desc, after_callbacks) as txn: - return func(txn, *args, **kwargs) + return self._new_transaction( + conn, desc, after_callbacks, func, *args, **kwargs + ) result = yield preserve_context_over_fn( self._db_pool.runWithConnection, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b4abd83260..260bdf0ec4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -420,12 +420,14 @@ class EventsStore(SQLBaseStore): ]) if not txn: + logger.debug("enqueue before") missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) + logger.debug("enqueue after") else: missing_events = self._fetch_events_txn( txn, @@ -498,41 +500,39 @@ class EventsStore(SQLBaseStore): allow_rejected=allow_rejected, )) - @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - defer.returnValue({}) - - def do_fetch(conn): - event_list = [] + def _do_fetch(self, conn): + event_list = [] + try: while True: - try: - with self._event_fetch_lock: - i = 0 - while not self._event_fetch_list: - self._event_fetch_ongoing -= 1 - return - - event_list = self._event_fetch_list - self._event_fetch_list = [] - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - - with self._new_transaction(conn, "do_fetch", []) as txn: - rows = self._fetch_event_rows(txn, event_ids) - - row_dict = { - r["event_id"]: r - for r in rows - } + logger.debug("do_fetch getting lock") + with self._event_fetch_lock: + logger.debug("do_fetch go lock: %r", self._event_fetch_list) + event_list = self._event_fetch_list + self._event_fetch_list = [] + if not event_list: + self._event_fetch_ongoing -= 1 + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + + rows = self._new_transaction( + conn, "do_fetch", [], self._fetch_event_rows, event_ids + ) - for ids, d in event_list: - def fire(): - if not d.called: + row_dict = { + r["event_id"]: r + for r in rows + } + + logger.debug("do_fetch got events: %r", row_dict.keys()) + + def fire(evs): + for ids, d in evs: + if not d.called: + try: d.callback( [ row_dict[i] @@ -540,32 +540,51 @@ class EventsStore(SQLBaseStore): if i in row_dict ] ) - reactor.callFromThread(fire) - except Exception as e: - logger.exception("do_fetch") - for _, d in event_list: - if not d.called: - reactor.callFromThread(d.errback, e) + except: + logger.exception("Failed to callback") + reactor.callFromThread(fire, event_list) + except Exception as e: + logger.exception("do_fetch") - with self._event_fetch_lock: - self._event_fetch_ongoing -= 1 - return + def fire(evs): + for _, d in evs: + if not d.called: + d.errback(e) + + if event_list: + reactor.callFromThread(fire, event_list) + + @defer.inlineCallbacks + def _enqueue_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) events_d = defer.Deferred() - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, events_d) - ) + try: + logger.debug("enqueueueueue getting lock") + with self._event_fetch_lock: + logger.debug("enqueue go lock") + self._event_fetch_list.append( + (events, events_d) + ) - self._event_fetch_lock.notify_all() + self._event_fetch_ongoing += 1 - # if self._event_fetch_ongoing < 5: - self._event_fetch_ongoing += 1 self.runWithConnection( - do_fetch + self._do_fetch ) - rows = yield events_d + except Exception as e: + if not events_d.called: + events_d.errback(e) + + logger.debug("events_d before") + try: + rows = yield events_d + except: + logger.exception("events_d") + logger.debug("events_d after") res = yield defer.gatherResults( [ @@ -580,6 +599,7 @@ class EventsStore(SQLBaseStore): ], consumeErrors=True ) + logger.debug("gatherResults after") defer.returnValue({ e.event_id: e @@ -639,7 +659,8 @@ class EventsStore(SQLBaseStore): rejected_reason=row["rejects"], ) for row in rows - ] + ], + consumeErrors=True, ) defer.returnValue({ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d16b57c515..af45fc5619 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -357,10 +357,12 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + logger.debug("stream before") events = yield self._get_events( [r["event_id"] for r in rows], get_prev_content=True ) + logger.debug("stream after") self._set_before_and_after(events, rows) diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 8c348ecc95..8573f18b55 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -33,8 +33,9 @@ class SQLBaseStoreTestCase(unittest.TestCase): def setUp(self): self.db_pool = Mock(spec=["runInteraction"]) self.mock_txn = Mock() - self.mock_conn = Mock(spec_set=["cursor"]) + self.mock_conn = Mock(spec_set=["cursor", "rollback", "commit"]) self.mock_conn.cursor.return_value = self.mock_txn + self.mock_conn.rollback.return_value = None # Our fake runInteraction just runs synchronously inline def runInteraction(func, *args, **kwargs): -- cgit 1.5.1 From 8eca5bd50abc9deb3cd428f3f6b3b8fbeb8bdee1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 20 May 2015 13:22:18 +0100 Subject: Fix the presence tests --- tests/handlers/test_presence.py | 13 +++---------- tests/rest/client/v1/test_presence.py | 3 +++ 2 files changed, 6 insertions(+), 10 deletions(-) (limited to 'tests') diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index ee773797e7..12cf5747a2 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -624,6 +624,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): """ PRESENCE_LIST = { 'apple': [ "@banana:test", "@clementine:test" ], + 'banana': [ "@apple:test" ], } @defer.inlineCallbacks @@ -836,12 +837,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): @defer.inlineCallbacks def test_recv_remote(self): - # TODO(paul): Gut-wrenching - potato_set = self.handler._remote_recvmap.setdefault(self.u_potato, - set()) - potato_set.add(self.u_apple) - - self.room_members = [self.u_banana, self.u_potato] + self.room_members = [self.u_apple, self.u_banana, self.u_potato] self.assertEquals(self.event_source.get_current_key(), 0) @@ -886,11 +882,8 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): @defer.inlineCallbacks def test_recv_remote_offline(self): """ Various tests relating to SYN-261 """ - potato_set = self.handler._remote_recvmap.setdefault(self.u_potato, - set()) - potato_set.add(self.u_apple) - self.room_members = [self.u_banana, self.u_potato] + self.room_members = [self.u_apple, self.u_banana, self.u_potato] self.assertEquals(self.event_source.get_current_key(), 0) diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 29c0038f06..8f3df92418 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -295,6 +295,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): else: return [] hs.handlers.room_member_handler.get_joined_rooms_for_user = get_rooms_for_user + hs.handlers.room_member_handler.get_room_members = ( + lambda r: self.room_members if r == "a-room" else [] + ) self.mock_datastore = hs.get_datastore() self.mock_datastore.get_app_service_by_token = Mock(return_value=None) -- cgit 1.5.1 From 88f1ea36cedff158e7a595c84bf949286fa38532 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 May 2015 15:23:40 +0100 Subject: Oops, get_rooms_for_user returns a namedtuple, not a room_id --- synapse/notifier.py | 1 + tests/rest/client/v1/test_presence.py | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) (limited to 'tests') diff --git a/synapse/notifier.py b/synapse/notifier.py index 1e73d52c4d..4f47f88df8 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -296,6 +296,7 @@ class Notifier(object): appservice = yield self.store.get_app_service_by_user_id(user) current_token = yield self.event_sources.get_current_token() rooms = yield self.store.get_rooms_for_user(user) + rooms = [room.room_id for room in rooms] user_stream = _NotifierUserStream( user=user, rooms=rooms, diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 29c0038f06..21f42b3d3e 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -29,6 +29,8 @@ from synapse.rest.client.v1 import events from synapse.types import UserID from synapse.util.async import run_on_reactor +from collections import namedtuple + OFFLINE = PresenceState.OFFLINE UNAVAILABLE = PresenceState.UNAVAILABLE @@ -302,7 +304,10 @@ class PresenceEventStreamTestCase(unittest.TestCase): return_value=defer.succeed(None) ) self.mock_datastore.get_rooms_for_user = ( - lambda u: get_rooms_for_user(UserID.from_string(u)) + lambda u: [ + namedtuple("Room", "room_id")(r) + for r in get_rooms_for_user(UserID.from_string(u)) + ] ) def get_profile_displayname(user_id): -- cgit 1.5.1 From f43544eecc362943f9d64a004d40984739a68d98 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 11:01:28 +0100 Subject: Make the appservice use 'users_in_room' rather than get_room_members since it is cached --- synapse/appservice/__init__.py | 6 +++--- synapse/handlers/appservice.py | 5 +---- tests/appservice/test_appservice.py | 15 +++------------ 3 files changed, 7 insertions(+), 19 deletions(-) (limited to 'tests') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 63a18b802b..e3ca45de83 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -148,8 +148,8 @@ class ApplicationService(object): and self.is_interested_in_user(event.state_key)): return True # check joined member events - for member in member_list: - if self.is_interested_in_user(member.state_key): + for user_id in member_list: + if self.is_interested_in_user(user_id): return True return False @@ -173,7 +173,7 @@ class ApplicationService(object): restrict_to(str): The namespace to restrict regex tests to. aliases_for_event(list): A list of all the known room aliases for this event. - member_list(list): A list of all joined room members in this room. + member_list(list): A list of all joined user_ids in this room. Returns: bool: True if this service would like to know about this event. """ diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 355ab317df..05735137d8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -147,10 +147,7 @@ class ApplicationServicesHandler(object): ) # We need to know the members associated with this event.room_id, # if any. - member_list = yield self.store.get_room_members( - room_id=event.room_id, - membership=Membership.JOIN - ) + member_list = yield self.store.get_users_in_room(event.room_id) services = yield self.store.get_app_services() interested_list = [ diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index 62149d6902..8ce8dc0a87 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -217,18 +217,9 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("@irc_.*") ) join_list = [ - Mock( - type="m.room.member", room_id="!foo:bar", sender="@alice:here", - state_key="@alice:here" - ), - Mock( - type="m.room.member", room_id="!foo:bar", sender="@irc_fo:here", - state_key="@irc_fo:here" # AS user - ), - Mock( - type="m.room.member", room_id="!foo:bar", sender="@bob:here", - state_key="@bob:here" - ) + "@alice:here", + "@irc_fo:here", # AS user + "@bob:here", ] self.event.sender = "@xmpp_foobar:matrix.org" -- cgit 1.5.1 From 17167898c816c95db8a3f4661d955f43ad6a87ce Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 16:22:54 +0100 Subject: Fix the presence tests --- tests/handlers/test_presence.py | 16 ++++++++++------ tests/handlers/test_presencelike.py | 20 +++++++++++--------- tests/rest/client/v1/test_presence.py | 4 ++-- 3 files changed, 23 insertions(+), 17 deletions(-) (limited to 'tests') diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 12cf5747a2..29372d488a 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -233,7 +233,7 @@ class MockedDatastorePresenceTestCase(PresenceTestCase): if not user_localpart in self.PRESENCE_LIST: return defer.succeed([]) return defer.succeed([ - {"observed_user_id": u} for u in + {"observed_user_id": u, "accepted": accepted} for u in self.PRESENCE_LIST[user_localpart]]) datastore.get_presence_list = get_presence_list @@ -734,10 +734,12 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): self.assertEquals( [ - {"observed_user": self.u_banana, - "presence": OFFLINE}, + {"observed_user": self.u_banana, + "presence": OFFLINE, + "accepted": True}, {"observed_user": self.u_clementine, - "presence": OFFLINE}, + "presence": OFFLINE, + "accepted": True}, ], presence ) @@ -758,9 +760,11 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): self.assertEquals([ {"observed_user": self.u_banana, "presence": ONLINE, - "last_active_ago": 2000}, + "last_active_ago": 2000, + "accepted": True}, {"observed_user": self.u_clementine, - "presence": OFFLINE}, + "presence": OFFLINE, + "accepted": True}, ], presence) (events, _) = yield self.event_source.get_new_events_for_user( diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py index 1f2e66ac11..19107caeee 100644 --- a/tests/handlers/test_presencelike.py +++ b/tests/handlers/test_presencelike.py @@ -101,8 +101,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.datastore.get_profile_avatar_url = get_profile_avatar_url self.presence_list = [ - {"observed_user_id": "@banana:test"}, - {"observed_user_id": "@clementine:test"}, + {"observed_user_id": "@banana:test", "accepted": True}, + {"observed_user_id": "@clementine:test", "accepted": True}, ] def get_presence_list(user_localpart, accepted=None): return defer.succeed(self.presence_list) @@ -144,8 +144,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): @defer.inlineCallbacks def test_set_my_state(self): self.presence_list = [ - {"observed_user_id": "@banana:test"}, - {"observed_user_id": "@clementine:test"}, + {"observed_user_id": "@banana:test", "accepted": True}, + {"observed_user_id": "@clementine:test", "accepted": True}, ] mocked_set = self.datastore.set_presence_state @@ -167,8 +167,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.mock_get_joined.side_effect = get_joined self.presence_list = [ - {"observed_user_id": "@banana:test"}, - {"observed_user_id": "@clementine:test"}, + {"observed_user_id": "@banana:test", "accepted": True}, + {"observed_user_id": "@clementine:test", "accepted": True}, ] self.datastore.set_presence_state.return_value = defer.succeed( @@ -203,9 +203,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): "presence": ONLINE, "last_active_ago": 0, "displayname": "Frank", - "avatar_url": "http://foo"}, + "avatar_url": "http://foo", + "accepted": True}, {"observed_user": self.u_clementine, - "presence": OFFLINE} + "presence": OFFLINE, + "accepted": True} ], presence) self.mock_update_client.assert_has_calls([ @@ -233,7 +235,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): @defer.inlineCallbacks def test_push_remote(self): self.presence_list = [ - {"observed_user_id": "@potato:remote"}, + {"observed_user_id": "@potato:remote", "accepted": True}, ] self.datastore.set_presence_state.return_value = defer.succeed( diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 523b30cf8a..4b32c7a203 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -183,7 +183,7 @@ class PresenceListTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_my_list(self): self.datastore.get_presence_list.return_value = defer.succeed( - [{"observed_user_id": "@banana:test"}], + [{"observed_user_id": "@banana:test", "accepted": True}], ) (code, response) = yield self.mock_resource.trigger("GET", @@ -191,7 +191,7 @@ class PresenceListTestCase(unittest.TestCase): self.assertEquals(200, code) self.assertEquals([ - {"user_id": "@banana:test", "presence": OFFLINE}, + {"user_id": "@banana:test", "presence": OFFLINE, "accepted": True}, ], response) self.datastore.get_presence_list.assert_called_with( -- cgit 1.5.1