From 5002056b16549ba2a9175ab62897014ee58e73ff Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 12 May 2015 11:20:40 +0100 Subject: SYN-377: Make sure that the StreamIdGenerator.get_next.__exit__ is called from the main thread after the transaction completes, not from database thread before the transaction completes. --- synapse/storage/events.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 38395c66ab..626a5eaf6e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,6 +23,7 @@ from synapse.crypto.event_signing import compute_event_reference_hash from syutil.base64util import decode_base64 from syutil.jsonutil import encode_canonical_json +from contextlib import contextmanager import logging @@ -41,17 +42,25 @@ class EventsStore(SQLBaseStore): self.min_token -= 1 stream_ordering = self.min_token + if stream_ordering is None: + stream_ordering_manager = yield self._stream_id_gen.get_next(self) + else: + @contextmanager + def stream_ordering_manager(): + yield stream_ordering + try: - yield self.runInteraction( - "persist_event", - self._persist_event_txn, - event=event, - context=context, - backfilled=backfilled, - stream_ordering=stream_ordering, - is_new_state=is_new_state, - current_state=current_state, - ) + with stream_ordering_manager as stream_ordering: + yield self.runInteraction( + "persist_event", + self._persist_event_txn, + event=event, + context=context, + backfilled=backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) except _RollbackButIsFineException: pass @@ -95,15 +104,6 @@ class EventsStore(SQLBaseStore): # Remove the any existing cache entries for the event_id txn.call_after(self._invalidate_get_event_cache, event.event_id) - if stream_ordering is None: - with self._stream_id_gen.get_next_txn(txn) as stream_ordering: - return self._persist_event_txn( - txn, event, context, backfilled, - stream_ordering=stream_ordering, - is_new_state=is_new_state, - current_state=current_state, - ) - # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: -- cgit 1.4.1 From 74850d7f75f64e537c3db36103107aece0fdf47f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 14:14:58 +0100 Subject: Do state groups persistence /after/ checking if we have already persisted the event --- synapse/storage/events.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 38395c66ab..a66e84b344 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -135,19 +135,17 @@ class EventsStore(SQLBaseStore): outlier = event.internal_metadata.is_outlier() if not outlier: - self._store_state_groups_txn(txn, event, context) - self._update_min_depth_for_room_txn( txn, event.room_id, event.depth ) - have_persisted = self._simple_select_one_onecol_txn( + have_persisted = self._simple_select_one_txn( txn, - table="event_json", + table="events", keyvalues={"event_id": event.event_id}, - retcol="event_id", + retcols=["event_id", "outlier"], allow_none=True, ) @@ -162,7 +160,9 @@ class EventsStore(SQLBaseStore): # if we are persisting an event that we had persisted as an outlier, # but is no longer one. if have_persisted: - if not outlier: + if not outlier and have_persisted["outlier"]: + self._store_state_groups_txn(txn, event, context) + sql = ( "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?" @@ -182,6 +182,9 @@ class EventsStore(SQLBaseStore): ) return + if not outlier: + self._store_state_groups_txn(txn, event, context) + self._handle_prev_events( txn, outlier=outlier, -- cgit 1.4.1 From e12268597804d759f82ccd827359059ba7cb05ef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 16:12:37 +0100 Subject: You need to call contextmanager --- synapse/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a5a6869079..9242b0a84e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -48,6 +48,7 @@ class EventsStore(SQLBaseStore): @contextmanager def stream_ordering_manager(): yield stream_ordering + stream_ordering_manager = stream_ordering_manager() try: with stream_ordering_manager as stream_ordering: -- cgit 1.4.1 From 63878c03794d33a8767425e114845159e5c1cb9a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 13:42:21 +0100 Subject: Don't bother checking for updates if the stream token hasn't advanced for a user --- synapse/handlers/_base.py | 7 +++- synapse/handlers/federation.py | 25 +++++++----- synapse/handlers/presence.py | 4 ++ synapse/handlers/typing.py | 4 +- synapse/notifier.py | 75 ++++++++++++++++++++++++----------- synapse/storage/events.py | 3 ++ synapse/types.py | 19 ++++++++- tests/handlers/test_federation.py | 4 +- tests/handlers/test_room.py | 8 ++-- tests/handlers/test_typing.py | 12 +++--- tests/rest/client/v1/test_presence.py | 15 ++++--- tests/utils.py | 2 +- 12 files changed, 123 insertions(+), 55 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index ddc5c21e7d..833ff41377 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -105,7 +105,9 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) - yield self.store.persist_event(event, context=context) + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) federation_handler = self.hs.get_handlers().federation_handler @@ -142,7 +144,8 @@ class BaseHandler(object): with PreserveLoggingContext(): # Don't block waiting on waking up all the listeners. notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, + extra_users=extra_users ) def log_failure(f): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7d9906039e..bc0f7b0ee7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -160,7 +160,7 @@ class FederationHandler(BaseHandler): ) try: - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -203,7 +203,8 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, + extra_users=extra_users ) def log_failure(f): @@ -561,7 +562,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, new_event, state=state, @@ -571,7 +572,8 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] + new_event, event_stream_id, max_stream_id, + extra_users=[joinee] ) def log_failure(f): @@ -637,7 +639,9 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(origin, event) + context, event_stream_id, max_stream_id = yield self._handle_new_event( + origin, event + ) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -653,7 +657,7 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, extra_users=extra_users ) def log_failure(f): @@ -727,7 +731,7 @@ class FederationHandler(BaseHandler): context = yield self.state_handler.compute_event_context(event) - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=False, @@ -736,7 +740,8 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(event.state_key) with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=[target_user], + event, event_stream_id, max_stream_id, + extra_users=[target_user], ) def log_failure(f): @@ -914,7 +919,7 @@ class FederationHandler(BaseHandler): ) raise - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=backfilled, @@ -922,7 +927,7 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - defer.returnValue(context) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 28688d532d..7db4b062d2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -345,6 +345,8 @@ class PresenceHandler(BaseHandler): curr_users = yield rm_handler.get_room_members(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: + statuscache = self._get_or_offline_usercache(local_user) + statuscache.update({}, serial=self._user_cachemap_latest_serial) self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], @@ -820,6 +822,8 @@ class PresenceHandler(BaseHandler): room_ids=[], statuscache=None): with PreserveLoggingContext(): self.notifier.on_new_user_event( + "presence_key", + self._user_cachemap_latest_serial, users_to_push, room_ids, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 64fe51aa3e..a9895292c2 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -218,7 +218,9 @@ class TypingNotificationHandler(BaseHandler): self._room_serials[room_id] = self._latest_room_serial with PreserveLoggingContext(): - self.notifier.on_new_user_event(rooms=[room_id]) + self.notifier.on_new_user_event( + "typing_key", self._latest_room_serial, rooms=[room_id] + ) class TypingNotificationEventSource(object): diff --git a/synapse/notifier.py b/synapse/notifier.py index 214a2b28ca..4d10c05038 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -52,12 +52,11 @@ class _NotificationListener(object): def notified(self): return self.deferred.called - def notify(self): + def notify(self, token): """ Inform whoever is listening about the new events. """ - try: - self.deferred.callback(None) + self.deferred.callback(token) except defer.AlreadyCalledError: pass @@ -73,15 +72,18 @@ class _NotifierUserStream(object): """ def __init__(self, user, rooms, current_token, appservice=None): - self.user = user + self.user = str(user) self.appservice = appservice self.listeners = set() - self.rooms = rooms + self.rooms = set(rooms) self.current_token = current_token - def notify(self, new_token): + def notify(self, stream_key, stream_id): + self.current_token = self.current_token.copy_and_replace( + stream_key, stream_id + ) for listener in self.listeners: - listener.notify(new_token) + listener.notify(self.current_token) self.listeners.clear() def remove(self, notifier): @@ -117,6 +119,7 @@ class Notifier(object): self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() + self.pending_new_room_events = [] self.clock = hs.get_clock() @@ -153,9 +156,21 @@ class Notifier(object): lambda: count(bool, self.appservice_to_user_streams.values()), ) + def notify_pending_new_room_events(self, max_room_stream_id): + pending = sorted(self.pending_new_room_events) + self.pending_new_room_events = [] + for event, room_stream_id, extra_users in pending: + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) + @log_function @defer.inlineCallbacks - def on_new_room_event(self, event, new_token, extra_users=[]): + def on_new_room_event(self, event, room_stream_id, max_room_stream_id, + extra_users=[]): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -163,8 +178,18 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ - assert isinstance(new_token, StreamToken) yield run_on_reactor() + + self.notify_pending_new_room_events(max_room_stream_id) + + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) + + def _on_new_room_event(self, event, room_stream_id, extra_users=[]): # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -197,33 +222,32 @@ class Notifier(object): for user_stream in user_streams: try: - user_stream.notify(new_token) + user_stream.notify("room_key", "s%d" % (room_stream_id,)) except: logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function - def on_new_user_event(self, new_token, users=[], rooms=[]): + def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend presence/user event wise. Will wake up all listeners for the given users and rooms. """ - assert isinstance(new_token, StreamToken) yield run_on_reactor() user_streams = set() for user in users: user_stream = self.user_to_user_stream.get(user) - if user_stream: - user_stream.add(user_stream) + if user_stream is not None: + user_streams.add(user_stream) for room in rooms: user_streams |= self.room_to_user_streams.get(room, set()) for user_stream in user_streams: try: - user_streams.notify(new_token) + user_stream.notify(stream_key, new_token) except: logger.exception("Failed to notify listener") @@ -236,12 +260,12 @@ class Notifier(object): deferred = defer.Deferred() - user_stream = self.user_to_user_streams.get(user) + user = str(user) + user_stream = self.user_to_user_stream.get(user) if user_stream is None: - appservice = yield self.store.get_app_service_by_user_id( - user.to_string() - ) + appservice = yield self.store.get_app_service_by_user_id(user) current_token = yield self.event_sources.get_current_token() + rooms = yield self.store.get_rooms_for_user(user) user_stream = _NotifierUserStream( user=user, rooms=rooms, @@ -252,8 +276,9 @@ class Notifier(object): else: current_token = user_stream.current_token + listener = [_NotificationListener(deferred)] + if timeout and not current_token.is_after(from_token): - listener = [_NotificationListener(deferred)] user_stream.listeners.add(listener[0]) if current_token.is_after(from_token): @@ -334,7 +359,7 @@ class Notifier(object): self.user_to_user_stream[user_stream.user] = user_stream for room in user_stream.rooms: - s = self.room_to_user_stream.setdefault(room, set()) + s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) if user_stream.appservice: @@ -343,10 +368,12 @@ class Notifier(object): ).add(user_stream) def _user_joined_room(self, user, room_id): + user = str(user) new_user_stream = self.user_to_user_stream.get(user) - room_streams = self.room_to_user_streams.setdefault(room_id, set()) - room_streams.add(new_user_stream) - new_user_stream.rooms.add(room_id) + if new_user_stream is not None: + room_streams = self.room_to_user_streams.setdefault(room_id, set()) + room_streams.add(new_user_stream) + new_user_stream.rooms.add(room_id) def _discard_if_notified(listener_set): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a5a6869079..7d6df5f4c6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -64,6 +64,9 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass + max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_ordering, max_persisted_id)) + @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, diff --git a/synapse/types.py b/synapse/types.py index 0f16867d75..d89a04f7c3 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -70,6 +70,8 @@ class DomainSpecificString( """Return a string encoding the fields of the structure object.""" return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) + __str__ = to_string + @classmethod def create(cls, localpart, domain,): return cls(localpart=localpart, domain=domain) @@ -107,7 +109,6 @@ class StreamToken( def from_string(cls, string): try: keys = string.split(cls._SEPARATOR) - return cls(*keys) except: raise SynapseError(400, "Invalid Token") @@ -115,6 +116,22 @@ class StreamToken( def to_string(self): return self._SEPARATOR.join([str(k) for k in self]) + @property + def room_stream_id(self): + # TODO(markjh): Awful hack to work around hacks in the presence tests + if type(self.room_key) is int: + return self.room_key + else: + return int(self.room_key[1:].split("-")[-1]) + + def is_after(self, other_token): + """Does this token contain events that the other doesn't?""" + return ( + (other_token.room_stream_id < self.room_stream_id) + or (int(other_token.presence_key) < int(self.presence_key)) + or (int(other_token.typing_key) < int(self.typing_key)) + ) + def copy_and_replace(self, key, new_value): d = self._asdict() d[key] = new_value diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 08d2404b6c..f3821242bc 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -83,7 +83,7 @@ class FederationTestCase(unittest.TestCase): "hashes": {"sha256":"AcLrgtUIqqwaGoHhrEvYG1YLDIsVPYJdSRGhkp3jJp8"}, }) - self.datastore.persist_event.return_value = defer.succeed(None) + self.datastore.persist_event.return_value = defer.succeed((1,1)) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) @@ -126,5 +126,5 @@ class FederationTestCase(unittest.TestCase): self.auth.check.assert_called_once_with(ANY, auth_events={}) self.notifier.on_new_room_event.assert_called_once_with( - ANY, extra_users=[] + ANY, 1, 1, extra_users=[] ) diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 6417f73309..a2d7635995 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -87,6 +87,8 @@ class RoomMemberHandlerTestCase(unittest.TestCase): self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) + self.datastore.persist_event.return_value = (1,1) + @defer.inlineCallbacks def test_invite(self): room_id = "!foo:red" @@ -160,7 +162,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context, ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[UserID.from_string(target_user_id)] + event, 1, 1, extra_users=[UserID.from_string(target_user_id)] ) self.assertFalse(self.datastore.get_room.called) self.assertFalse(self.datastore.store_room.called) @@ -226,7 +228,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[user] + event, 1, 1, extra_users=[user] ) join_signal_observer.assert_called_with( @@ -304,7 +306,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[user] + event, 1, 1, extra_users=[user] ) leave_signal_observer.assert_called_with( diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index b318d4944a..7ccbe2ea9c 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -183,7 +183,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 1) @@ -246,7 +246,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 1) @@ -300,7 +300,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) yield put_json.await_calls() @@ -332,7 +332,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.on_new_user_event.reset_mock() @@ -352,7 +352,7 @@ class TypingNotificationsTestCase(unittest.TestCase): self.clock.advance_time(11) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 2, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 2) @@ -378,7 +378,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 3, rooms=[self.room_id]), ]) self.on_new_user_event.reset_mock() diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 8e0c5fa630..c0c52796ad 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -27,6 +27,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.rest.client.v1 import presence from synapse.rest.client.v1 import events from synapse.types import UserID +from synapse.util.async import run_on_reactor OFFLINE = PresenceState.OFFLINE @@ -264,6 +265,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): datastore=Mock(spec=[ "set_presence_state", "get_presence_list", + "get_rooms_for_user", ]), clock=Mock(spec=[ "call_later", @@ -298,6 +300,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.get_app_service_by_user_id = Mock( return_value=defer.succeed(None) ) + self.mock_datastore.get_rooms_for_user = ( + lambda u: get_rooms_for_user(UserID.from_string(u)) + ) def get_profile_displayname(user_id): return defer.succeed("Frank") @@ -350,19 +355,19 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.set_presence_state.return_value = defer.succeed( {"state": ONLINE} ) - self.mock_datastore.get_presence_list.return_value = defer.succeed( - [] - ) + self.mock_datastore.get_presence_list.return_value = defer.succeed([]) yield self.presence.set_state(self.u_banana, self.u_banana, state={"presence": ONLINE} ) + yield run_on_reactor() + (code, response) = yield self.mock_resource.trigger("GET", - "/events?from=0_1_0&timeout=0", None) + "/events?from=s0_1_0&timeout=0", None) self.assertEquals(200, code) - self.assertEquals({"start": "0_1_0", "end": "0_2_0", "chunk": [ + self.assertEquals({"start": "s0_1_0", "end": "s0_2_0", "chunk": [ {"type": "m.presence", "content": { "user_id": "@banana:test", diff --git a/tests/utils.py b/tests/utils.py index cc038fecf1..a67530bd63 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -355,7 +355,7 @@ class MemoryDataStore(object): return [] def get_room_events_max_id(self): - return 0 # TODO (erikj) + return "s0" # TODO (erikj) def get_send_event_level(self, room_id): return defer.succeed(0) -- cgit 1.4.1 From 386b7330d2902fe8acac0efadb095be389700764 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 14:45:22 +0100 Subject: Move from _base to events --- synapse/storage/_base.py | 233 +------------------------------------------ synapse/storage/events.py | 246 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 247 insertions(+), 232 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1a76b22e4f..a8f8989e38 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,9 +15,7 @@ import logging from synapse.api.errors import StoreError -from synapse.events import FrozenEvent -from synapse.events.utils import prune_event -from synapse.util import unwrap_deferred + from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache @@ -29,7 +27,6 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict import functools -import simplejson as json import sys import time import threading @@ -868,234 +865,6 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) - @defer.inlineCallbacks - def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False, txn=None): - if not event_ids: - defer.returnValue([]) - - event_map = self._get_events_from_cache( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - missing_events_ids = [e for e in event_ids if e not in event_map] - - if not missing_events_ids: - defer.returnValue([ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ]) - - def get_missing(txn): - missing_events = unwrap_deferred(self._fetch_events( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - )) - - event_map.update(missing_events) - - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] - - if not txn: - res = yield self.runInteraction("_get_events", get_missing) - defer.returnValue(res) - else: - defer.returnValue(get_missing(txn)) - - def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - return unwrap_deferred(self._get_events( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - txn=txn, - )) - - def _invalidate_get_event_cache(self, event_id): - for check_redacted in (False, True): - for get_prev_content in (False, True): - self._get_event_cache.invalidate(event_id, check_redacted, - get_prev_content) - - def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False): - - events = self._get_events_txn( - txn, [event_id], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - return events[0] if events else None - - def _get_events_from_cache(self, events, check_redacted, get_prev_content, - allow_rejected): - event_map = {} - - for event_id in events: - try: - ret = self._get_event_cache.get( - event_id, check_redacted, get_prev_content - ) - - if allow_rejected or not ret.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None - except KeyError: - pass - - return event_map - - @defer.inlineCallbacks - def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - defer.returnValue({}) - - rows = [] - N = 200 - for i in range(1 + len(events) / N): - evs = events[i*N:(i + 1)*N] - if not evs: - break - - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - " FROM event_json as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(evs)),) - - if txn: - txn.execute(sql, evs) - rows.extend(txn.fetchall()) - else: - res = yield self._execute("_fetch_events", None, sql, *evs) - rows.extend(res) - - res = yield defer.gatherResults( - [ - defer.maybeDeferred( - self._get_event_from_row, - txn, - row[0], row[1], row[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row[3], - ) - for row in rows - ], - consumeErrors=True, - ) - - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) - - defer.returnValue({ - e.event_id: e - for e in res if e - }) - - @defer.inlineCallbacks - def _get_event_from_row(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - def select(txn, *args, **kwargs): - if txn: - return self._simple_select_one_onecol_txn(txn, *args, **kwargs) - else: - return self._simple_select_one_onecol( - *args, - desc="_get_event_from_row", **kwargs - ) - - def get_event(txn, *args, **kwargs): - if txn: - return self._get_event_txn(txn, *args, **kwargs) - else: - return self.get_event(*args, **kwargs) - - if rejected_reason: - rejected_reason = yield select( - txn, - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - ) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - - if check_redacted and redacted: - ev = prune_event(ev) - - redaction_id = yield select( - txn, - table="redactions", - keyvalues={"redacts": ev.event_id}, - retcol="event_id", - ) - - ev.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - - because = yield get_event( - txn, - redaction_id, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield get_event( - txn, - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] - - defer.returnValue(ev) - - def _parse_events(self, rows): - return self.runInteraction( - "_parse_events", self._parse_events_txn, rows - ) - - def _parse_events_txn(self, txn, rows): - event_ids = [r["event_id"] for r in rows] - - return self._get_events_txn(txn, event_ids) - - def _has_been_redacted_txn(self, txn, event): - sql = "SELECT event_id FROM redactions WHERE redacts = ?" - txn.execute(sql, (event.event_id,)) - result = txn.fetchone() - return result[0] if result else None - def get_next_stream_id(self): with self._next_stream_id_lock: i = self._next_stream_id diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9242b0a84e..f960ef8350 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,6 +17,10 @@ from _base import SQLBaseStore, _RollbackButIsFineException from twisted.internet import defer +from synapse.events import FrozenEvent +from synapse.events.utils import prune_event +from synapse.util import unwrap_deferred + from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from synapse.crypto.event_signing import compute_event_reference_hash @@ -26,6 +30,7 @@ from syutil.jsonutil import encode_canonical_json from contextlib import contextmanager import logging +import simplejson as json logger = logging.getLogger(__name__) @@ -393,3 +398,244 @@ class EventsStore(SQLBaseStore): return self.runInteraction( "have_events", f, ) + + @defer.inlineCallbacks + def _get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False, txn=None): + if not event_ids: + defer.returnValue([]) + + event_map = self._get_events_from_cache( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + missing_events_ids = [e for e in event_ids if e not in event_map] + + if not missing_events_ids: + defer.returnValue([ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ]) + + if not txn: + missing_events = yield self.runInteraction( + "_get_events", + self._fetch_events_txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + else: + missing_events = yield self._fetch_events( + txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + event_map.update(missing_events) + + defer.returnValue([ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ]) + + + def _get_events_txn(self, txn, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + return unwrap_deferred(self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + txn=txn, + )) + + def _invalidate_get_event_cache(self, event_id): + for check_redacted in (False, True): + for get_prev_content in (False, True): + self._get_event_cache.invalidate(event_id, check_redacted, + get_prev_content) + + def _get_event_txn(self, txn, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False): + + events = self._get_events_txn( + txn, [event_id], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + return events[0] if events else None + + def _get_events_from_cache(self, events, check_redacted, get_prev_content, + allow_rejected): + event_map = {} + + for event_id in events: + try: + ret = self._get_event_cache.get( + event_id, check_redacted, get_prev_content + ) + + if allow_rejected or not ret.rejected_reason: + event_map[event_id] = ret + else: + event_map[event_id] = None + except KeyError: + pass + + return event_map + + def _fetch_events_txn(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + return unwrap_deferred(self._fetch_events( + txn, events, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + )) + + @defer.inlineCallbacks + def _fetch_events(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) + + rows = [] + N = 200 + for i in range(1 + len(events) / N): + evs = events[i*N:(i + 1)*N] + if not evs: + break + + sql = ( + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"]*len(evs)),) + + if txn: + txn.execute(sql, evs) + rows.extend(txn.fetchall()) + else: + res = yield self._execute("_fetch_events", None, sql, *evs) + rows.extend(res) + + res = yield defer.gatherResults( + [ + defer.maybeDeferred( + self._get_event_from_row, + txn, + row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + for row in rows + ], + consumeErrors=True, + ) + + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) + + defer.returnValue({ + e.event_id: e + for e in res if e + }) + + @defer.inlineCallbacks + def _get_event_from_row(self, txn, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) + + def select(txn, *args, **kwargs): + if txn: + return self._simple_select_one_onecol_txn(txn, *args, **kwargs) + else: + return self._simple_select_one_onecol( + *args, + desc="_get_event_from_row", **kwargs + ) + + def get_event(txn, *args, **kwargs): + if txn: + return self._get_event_txn(txn, *args, **kwargs) + else: + return self.get_event(*args, **kwargs) + + if rejected_reason: + rejected_reason = yield select( + txn, + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + ) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + + if check_redacted and redacted: + ev = prune_event(ev) + + redaction_id = yield select( + txn, + table="redactions", + keyvalues={"redacts": ev.event_id}, + retcol="event_id", + ) + + ev.unsigned["redacted_by"] = redaction_id + # Get the redaction event. + + because = yield get_event( + txn, + redaction_id, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = yield get_event( + txn, + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + + defer.returnValue(ev) + + def _parse_events(self, rows): + return self.runInteraction( + "_parse_events", self._parse_events_txn, rows + ) + + def _parse_events_txn(self, txn, rows): + event_ids = [r["event_id"] for r in rows] + + return self._get_events_txn(txn, event_ids) + + def _has_been_redacted_txn(self, txn, event): + sql = "SELECT event_id FROM redactions WHERE redacts = ?" + txn.execute(sql, (event.event_id,)) + result = txn.fetchone() + return result[0] if result else None -- cgit 1.4.1 From f4d58deba19bde81b629c8ceb8df3aca60aa1e15 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 14:45:42 +0100 Subject: PEP8 --- synapse/storage/events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f960ef8350..28d2c0896b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -445,7 +445,6 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - def _get_events_txn(self, txn, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False): return unwrap_deferred(self._get_events( @@ -494,7 +493,7 @@ class EventsStore(SQLBaseStore): return event_map def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + get_prev_content=False, allow_rejected=False): return unwrap_deferred(self._fetch_events( txn, events, check_redacted=check_redacted, -- cgit 1.4.1 From 7f4105a5c99d72662db0704ab03bff24a951005b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 14:51:06 +0100 Subject: Turn off preemptive transactions --- synapse/storage/events.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 28d2c0896b..0aa4e0d445 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -420,23 +420,13 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - if not txn: - missing_events = yield self.runInteraction( - "_get_events", - self._fetch_events_txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - else: - missing_events = yield self._fetch_events( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + missing_events = yield self._fetch_events( + txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) event_map.update(missing_events) -- cgit 1.4.1 From 7cd6a6f6cf03e5bfde99b839bdc67f8f5513cdc5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:34:02 +0100 Subject: Awful idea for speeding up fetching of events --- synapse/storage/_base.py | 4 ++ synapse/storage/events.py | 167 ++++++++++++++++++++++++++++++++++++---------- synapse/util/__init__.py | 8 +-- 3 files changed, 139 insertions(+), 40 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a8f8989e38..c20ff3a572 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -299,6 +299,10 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self._event_fetch_lock = threading.Lock() + self._event_fetch_list = [] + self._event_fetch_ongoing = False + self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator() diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0aa4e0d445..be88328ce5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -15,7 +15,7 @@ from _base import SQLBaseStore, _RollbackButIsFineException -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.events import FrozenEvent from synapse.events.utils import prune_event @@ -89,18 +89,17 @@ class EventsStore(SQLBaseStore): Returns: Deferred : A FrozenEvent. """ - event = yield self.runInteraction( - "get_event", self._get_event_txn, - event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, + events = yield self._get_events( + [event_id], + check_redacted=True, + get_prev_content=False, + allow_rejected=False, ) - if not event and not allow_none: + if not events and not allow_none: raise RuntimeError("Could not find event %s" % (event_id,)) - defer.returnValue(event) + defer.returnValue(events[0] if events else None) @log_function def _persist_event_txn(self, txn, event, context, backfilled, @@ -420,13 +419,21 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - missing_events = yield self._fetch_events( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + if not txn: + missing_events = yield self._enqueue_events( + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + else: + missing_events = self._fetch_events_txn( + txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) event_map.update(missing_events) @@ -492,11 +499,82 @@ class EventsStore(SQLBaseStore): )) @defer.inlineCallbacks - def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + def _enqueue_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): if not events: defer.returnValue({}) + def do_fetch(txn): + event_list = [] + try: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + rows = self._fetch_event_rows(txn, event_ids) + + row_dict = { + r["event_id"]: r + for r in rows + } + + for ids, d in event_list: + d.callback( + [ + row_dict[i] for i in ids + if i in row_dict + ] + ) + except Exception as e: + for _, d in event_list: + try: + reactor.callFromThread(d.errback, e) + except: + pass + finally: + with self._event_fetch_lock: + self._event_fetch_ongoing = False + + def cb(rows): + return defer.gatherResults([ + self._get_event_from_row( + None, + row["internal_metadata"], row["json"], row["redacts"], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row["rejects"], + ) + for row in rows + ]) + + d = defer.Deferred() + d.addCallback(cb) + with self._event_fetch_lock: + self._event_fetch_list.append( + (events, d) + ) + + if not self._event_fetch_ongoing: + self.runInteraction( + "do_fetch", + do_fetch + ) + + res = yield d + + defer.returnValue({ + e.event_id: e + for e in res if e + }) + + def _fetch_event_rows(self, txn, events): rows = [] N = 200 for i in range(1 + len(events) / N): @@ -505,43 +583,56 @@ class EventsStore(SQLBaseStore): break sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + "SELECT " + " e.event_id as event_id, " + " e.internal_metadata," + " e.json," + " r.redacts as redacts," + " rej.event_id as rejects " " FROM event_json as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" ) % (",".join(["?"]*len(evs)),) - if txn: - txn.execute(sql, evs) - rows.extend(txn.fetchall()) - else: - res = yield self._execute("_fetch_events", None, sql, *evs) - rows.extend(res) + txn.execute(sql, evs) + rows.extend(self.cursor_to_dict(txn)) + + return rows + + @defer.inlineCallbacks + def _fetch_events(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) + + if txn: + rows = self._fetch_event_rows( + txn, events, + ) + else: + rows = yield self.runInteraction( + self._fetch_event_rows, + events, + ) res = yield defer.gatherResults( [ defer.maybeDeferred( self._get_event_from_row, txn, - row[0], row[1], row[2], + row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, - rejected_reason=row[3], + rejected_reason=row["rejects"], ) for row in rows - ], - consumeErrors=True, + ] ) - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) - defer.returnValue({ - e.event_id: e - for e in res if e + r.event_id: r + for r in res }) @defer.inlineCallbacks @@ -611,6 +702,10 @@ class EventsStore(SQLBaseStore): if prev: ev.unsigned["prev_content"] = prev.get_dict()["content"] + self._get_event_cache.prefill( + ev.event_id, check_redacted, get_prev_content, ev + ) + defer.returnValue(ev) def _parse_events(self, rows): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index b9afb3364d..260714ccc2 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -80,16 +80,16 @@ class Clock(object): def stop_looping_call(self, loop): loop.stop() - def call_later(self, delay, callback): + def call_later(self, delay, callback, *args, **kwargs): current_context = LoggingContext.current_context() - def wrapped_callback(): + def wrapped_callback(*args, **kwargs): with PreserveLoggingContext(): LoggingContext.thread_local.current_context = current_context - callback() + callback(*args, **kwargs) with PreserveLoggingContext(): - return reactor.callLater(delay, wrapped_callback) + return reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer): timer.cancel() -- cgit 1.4.1 From 96c5b9f87cd5d959d0976487399d4b913082598d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:36:04 +0100 Subject: Don't start up more fetch_events --- synapse/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index be88328ce5..0859518b1b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -566,6 +566,7 @@ class EventsStore(SQLBaseStore): "do_fetch", do_fetch ) + self._event_fetch_ongoing = True res = yield d -- cgit 1.4.1 From 142934084a374c3e47b63939526827f5afa7410d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:40:21 +0100 Subject: Count and loop --- synapse/storage/_base.py | 2 +- synapse/storage/events.py | 70 +++++++++++++++++++++++------------------------ 2 files changed, 35 insertions(+), 37 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c20ff3a572..97bf42469a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,7 +301,7 @@ class SQLBaseStore(object): self._event_fetch_lock = threading.Lock() self._event_fetch_list = [] - self._event_fetch_ongoing = False + self._event_fetch_ongoing = 0 self.database_engine = hs.database_engine diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0859518b1b..a6b2e7677f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -506,41 +506,39 @@ class EventsStore(SQLBaseStore): def do_fetch(txn): event_list = [] - try: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - return - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - rows = self._fetch_event_rows(txn, event_ids) - - row_dict = { - r["event_id"]: r - for r in rows - } + while True: + try: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + rows = self._fetch_event_rows(txn, event_ids) + + row_dict = { + r["event_id"]: r + for r in rows + } - for ids, d in event_list: - d.callback( - [ - row_dict[i] for i in ids - if i in row_dict - ] - ) - except Exception as e: - for _, d in event_list: - try: - reactor.callFromThread(d.errback, e) - except: - pass - finally: - with self._event_fetch_lock: - self._event_fetch_ongoing = False + for ids, d in event_list: + d.callback( + [ + row_dict[i] for i in ids + if i in row_dict + ] + ) + except Exception as e: + for _, d in event_list: + try: + reactor.callFromThread(d.errback, e) + except: + pass def cb(rows): return defer.gatherResults([ @@ -561,12 +559,12 @@ class EventsStore(SQLBaseStore): (events, d) ) - if not self._event_fetch_ongoing: + if self._event_fetch_ongoing < 3: + self._event_fetch_ongoing += 1 self.runInteraction( "do_fetch", do_fetch ) - self._event_fetch_ongoing = True res = yield d -- cgit 1.4.1 From ef3d8754f51e38375dc8d5144d2224d5e86ce458 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:41:55 +0100 Subject: Call from right thread --- synapse/storage/events.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a6b2e7677f..59af21a2ca 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -513,6 +513,7 @@ class EventsStore(SQLBaseStore): self._event_fetch_list = [] if not event_list: + self._event_fetch_ongoing -= 1 return event_id_lists = zip(*event_list)[0] @@ -527,7 +528,8 @@ class EventsStore(SQLBaseStore): } for ids, d in event_list: - d.callback( + reactor.callFromThread( + d.callback, [ row_dict[i] for i in ids if i in row_dict -- cgit 1.4.1 From 1d566edb81e1dffea026d4e603a12cee664a8eda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 16:54:35 +0100 Subject: Remove race condition --- synapse/storage/_base.py | 168 +++++++++++++++++++++++------------- synapse/storage/engines/postgres.py | 2 + synapse/storage/engines/sqlite3.py | 2 + synapse/storage/events.py | 81 +++++++++-------- 4 files changed, 157 insertions(+), 96 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 97bf42469a..ceff99c16d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -26,6 +26,8 @@ from util.id_generators import IdGenerator, StreamIdGenerator from twisted.internet import defer from collections import namedtuple, OrderedDict + +import contextlib import functools import sys import time @@ -299,7 +301,7 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._event_fetch_lock = threading.Lock() + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 @@ -342,6 +344,84 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) + @contextlib.contextmanager + def _new_transaction(self, conn, desc, after_callbacks): + start = time.time() * 1000 + txn_id = self._TXN_ID + + # We don't really need these to be unique, so lets stop it from + # growing really large. + self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) + + name = "%s-%x" % (desc, txn_id, ) + + transaction_logger.debug("[TXN START] {%s}", name) + + try: + i = 0 + N = 5 + while True: + try: + txn = conn.cursor() + txn = LoggingTransaction( + txn, name, self.database_engine, after_callbacks + ) + except self.database_engine.module.OperationalError as e: + # This can happen if the database disappears mid + # transaction. + logger.warn( + "[TXN OPERROR] {%s} %s %d/%d", + name, e, i, N + ) + if i < N: + i += 1 + try: + conn.rollback() + except self.database_engine.module.Error as e1: + logger.warn( + "[TXN EROLL] {%s} %s", + name, e1, + ) + continue + raise + except self.database_engine.module.DatabaseError as e: + if self.database_engine.is_deadlock(e): + logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N) + if i < N: + i += 1 + try: + conn.rollback() + except self.database_engine.module.Error as e1: + logger.warn( + "[TXN EROLL] {%s} %s", + name, e1, + ) + continue + raise + + try: + yield txn + conn.commit() + return + except: + try: + conn.rollback() + except: + pass + raise + except Exception as e: + logger.debug("[TXN FAIL] {%s} %s", name, e) + raise + finally: + end = time.time() * 1000 + duration = end - start + + transaction_logger.debug("[TXN END] {%s} %f", name, duration) + + self._current_txn_total_time += duration + self._txn_perf_counters.update(desc, start, end) + sql_txn_timer.inc_by(duration, desc) + @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): """Wraps the .runInteraction() method on the underlying db_pool.""" @@ -353,83 +433,49 @@ class SQLBaseStore(object): def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: + sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) + if self.database_engine.is_connection_closed(conn): logger.debug("Reconnecting closed database connection") conn.reconnect() current_context.copy_to(context) - start = time.time() * 1000 - txn_id = self._TXN_ID + with self._new_transaction(conn, desc, after_callbacks) as txn: + return func(txn, *args, **kwargs) - # We don't really need these to be unique, so lets stop it from - # growing really large. - self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) + result = yield preserve_context_over_fn( + self._db_pool.runWithConnection, + inner_func, *args, **kwargs + ) - name = "%s-%x" % (desc, txn_id, ) + for after_callback, after_args in after_callbacks: + after_callback(*after_args) + defer.returnValue(result) + @defer.inlineCallbacks + def runWithConnection(self, func, *args, **kwargs): + """Wraps the .runInteraction() method on the underlying db_pool.""" + current_context = LoggingContext.current_context() + + start_time = time.time() * 1000 + + def inner_func(conn, *args, **kwargs): + with LoggingContext("runWithConnection") as context: sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) - transaction_logger.debug("[TXN START] {%s}", name) - try: - i = 0 - N = 5 - while True: - try: - txn = conn.cursor() - txn = LoggingTransaction( - txn, name, self.database_engine, after_callbacks - ) - return func(txn, *args, **kwargs) - except self.database_engine.module.OperationalError as e: - # This can happen if the database disappears mid - # transaction. - logger.warn( - "[TXN OPERROR] {%s} %s %d/%d", - name, e, i, N - ) - if i < N: - i += 1 - try: - conn.rollback() - except self.database_engine.module.Error as e1: - logger.warn( - "[TXN EROLL] {%s} %s", - name, e1, - ) - continue - except self.database_engine.module.DatabaseError as e: - if self.database_engine.is_deadlock(e): - logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N) - if i < N: - i += 1 - try: - conn.rollback() - except self.database_engine.module.Error as e1: - logger.warn( - "[TXN EROLL] {%s} %s", - name, e1, - ) - continue - raise - except Exception as e: - logger.debug("[TXN FAIL] {%s} %s", name, e) - raise - finally: - end = time.time() * 1000 - duration = end - start - transaction_logger.debug("[TXN END] {%s} %f", name, duration) + if self.database_engine.is_connection_closed(conn): + logger.debug("Reconnecting closed database connection") + conn.reconnect() + + current_context.copy_to(context) - self._current_txn_total_time += duration - self._txn_perf_counters.update(desc, start, end) - sql_txn_timer.inc_by(duration, desc) + return func(conn, *args, **kwargs) result = yield preserve_context_over_fn( self._db_pool.runWithConnection, inner_func, *args, **kwargs ) - for after_callback, after_args in after_callbacks: - after_callback(*after_args) defer.returnValue(result) def cursor_to_dict(self, cursor): diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a323028546..4a855ffd56 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -19,6 +19,8 @@ from ._base import IncorrectDatabaseSetup class PostgresEngine(object): + single_threaded = False + def __init__(self, database_module): self.module = database_module self.module.extensions.register_type(self.module.extensions.UNICODE) diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index ff13d8006a..d18e2808d1 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -17,6 +17,8 @@ from synapse.storage import prepare_database, prepare_sqlite3_database class Sqlite3Engine(object): + single_threaded = True + def __init__(self, database_module): self.module = database_module diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 59af21a2ca..b4abd83260 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -504,23 +504,26 @@ class EventsStore(SQLBaseStore): if not events: defer.returnValue({}) - def do_fetch(txn): + def do_fetch(conn): event_list = [] while True: try: with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: + i = 0 + while not self._event_fetch_list: self._event_fetch_ongoing -= 1 return + event_list = self._event_fetch_list + self._event_fetch_list = [] + event_id_lists = zip(*event_list)[0] event_ids = [ item for sublist in event_id_lists for item in sublist ] - rows = self._fetch_event_rows(txn, event_ids) + + with self._new_transaction(conn, "do_fetch", []) as txn: + rows = self._fetch_event_rows(txn, event_ids) row_dict = { r["event_id"]: r @@ -528,22 +531,44 @@ class EventsStore(SQLBaseStore): } for ids, d in event_list: - reactor.callFromThread( - d.callback, - [ - row_dict[i] for i in ids - if i in row_dict - ] - ) + def fire(): + if not d.called: + d.callback( + [ + row_dict[i] + for i in ids + if i in row_dict + ] + ) + reactor.callFromThread(fire) except Exception as e: + logger.exception("do_fetch") for _, d in event_list: - try: + if not d.called: reactor.callFromThread(d.errback, e) - except: - pass - def cb(rows): - return defer.gatherResults([ + with self._event_fetch_lock: + self._event_fetch_ongoing -= 1 + return + + events_d = defer.Deferred() + with self._event_fetch_lock: + self._event_fetch_list.append( + (events, events_d) + ) + + self._event_fetch_lock.notify_all() + + # if self._event_fetch_ongoing < 5: + self._event_fetch_ongoing += 1 + self.runWithConnection( + do_fetch + ) + + rows = yield events_d + + res = yield defer.gatherResults( + [ self._get_event_from_row( None, row["internal_metadata"], row["json"], row["redacts"], @@ -552,23 +577,9 @@ class EventsStore(SQLBaseStore): rejected_reason=row["rejects"], ) for row in rows - ]) - - d = defer.Deferred() - d.addCallback(cb) - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, d) - ) - - if self._event_fetch_ongoing < 3: - self._event_fetch_ongoing += 1 - self.runInteraction( - "do_fetch", - do_fetch - ) - - res = yield d + ], + consumeErrors=True + ) defer.returnValue({ e.event_id: e -- cgit 1.4.1 From a2c4f3f150f63c720370f6882da804c8ac20fd69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 10:54:04 +0100 Subject: Fix daedlock --- synapse/federation/federation_client.py | 15 +++- synapse/federation/federation_server.py | 2 + synapse/handlers/message.py | 33 ++++++--- synapse/storage/_base.py | 26 +++---- synapse/storage/events.py | 125 +++++++++++++++++++------------- synapse/storage/stream.py | 2 + tests/storage/test_base.py | 3 +- 7 files changed, 122 insertions(+), 84 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 904c7c0945..c255df1bbb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -222,7 +222,7 @@ class FederationClient(FederationBase): for p in transaction_data["pdus"] ] - if pdu_list: + if pdu_list and pdu_list[0]: pdu = pdu_list[0] # Check signatures are correct. @@ -255,7 +255,7 @@ class FederationClient(FederationBase): ) continue - if self._get_pdu_cache is not None: + if self._get_pdu_cache is not None and pdu: self._get_pdu_cache[event_id] = pdu defer.returnValue(pdu) @@ -475,6 +475,9 @@ class FederationClient(FederationBase): limit (int): Maximum number of events to return. min_depth (int): Minimum depth of events tor return. """ + logger.debug("get_missing_events: latest_events: %r", latest_events) + logger.debug("get_missing_events: earliest_events_ids: %r", earliest_events_ids) + try: content = yield self.transport_layer.get_missing_events( destination=destination, @@ -485,6 +488,8 @@ class FederationClient(FederationBase): min_depth=min_depth, ) + logger.debug("get_missing_events: Got content: %r", content) + events = [ self.event_from_pdu_json(e) for e in content.get("events", []) @@ -494,6 +499,8 @@ class FederationClient(FederationBase): destination, events, outlier=False ) + logger.debug("get_missing_events: signed_events: %r", signed_events) + have_gotten_all_from_destination = True except HttpResponseException as e: if not e.code == 400: @@ -518,6 +525,8 @@ class FederationClient(FederationBase): # Are we missing any? seen_events = set(earliest_events_ids) + + logger.debug("get_missing_events: signed_events2: %r", signed_events) seen_events.update(e.event_id for e in signed_events) missing_events = {} @@ -561,7 +570,7 @@ class FederationClient(FederationBase): res = yield defer.DeferredList(deferreds, consumeErrors=True) for (result, val), (e_id, _) in zip(res, ordered_missing): - if result: + if result and val: signed_events.append(val) else: failed_to_fetch.add(e_id) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index cd79e23f4b..2c6488dd1b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -415,6 +415,8 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: + logger.debug("We're missing: %r", prevs-seen) + latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 867fdbefb0..6a1b25d112 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -303,18 +303,27 @@ class MessageHandler(BaseHandler): if event.membership != Membership.JOIN: return try: - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=now_token.room_key, - ), - self.state_handler.get_current_state( - event.room_id - ), - ] - ).addErrback(unwrapFirstError) + # (messages, token), current_state = yield defer.gatherResults( + # [ + # self.store.get_recent_events_for_room( + # event.room_id, + # limit=limit, + # end_token=now_token.room_key, + # ), + # self.state_handler.get_current_state( + # event.room_id + # ), + # ] + # ).addErrback(unwrapFirstError) + + messages, token = yield self.store.get_recent_events_for_room( + event.room_id, + limit=limit, + end_token=now_token.room_key, + ) + current_state = yield self.state_handler.get_current_state( + event.room_id + ) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ceff99c16d..0df1b46edc 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,10 +301,12 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._event_fetch_lock = threading.Condition() + self._event_fetch_lock = threading.Lock() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + self._pending_ds = [] + self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator() @@ -344,8 +346,7 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - @contextlib.contextmanager - def _new_transaction(self, conn, desc, after_callbacks): + def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -366,6 +367,9 @@ class SQLBaseStore(object): txn = LoggingTransaction( txn, name, self.database_engine, after_callbacks ) + r = func(txn, *args, **kwargs) + conn.commit() + return r except self.database_engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -398,17 +402,6 @@ class SQLBaseStore(object): ) continue raise - - try: - yield txn - conn.commit() - return - except: - try: - conn.rollback() - except: - pass - raise except Exception as e: logger.debug("[TXN FAIL] {%s} %s", name, e) raise @@ -440,8 +433,9 @@ class SQLBaseStore(object): conn.reconnect() current_context.copy_to(context) - with self._new_transaction(conn, desc, after_callbacks) as txn: - return func(txn, *args, **kwargs) + return self._new_transaction( + conn, desc, after_callbacks, func, *args, **kwargs + ) result = yield preserve_context_over_fn( self._db_pool.runWithConnection, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b4abd83260..260bdf0ec4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -420,12 +420,14 @@ class EventsStore(SQLBaseStore): ]) if not txn: + logger.debug("enqueue before") missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) + logger.debug("enqueue after") else: missing_events = self._fetch_events_txn( txn, @@ -498,41 +500,39 @@ class EventsStore(SQLBaseStore): allow_rejected=allow_rejected, )) - @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - defer.returnValue({}) - - def do_fetch(conn): - event_list = [] + def _do_fetch(self, conn): + event_list = [] + try: while True: - try: - with self._event_fetch_lock: - i = 0 - while not self._event_fetch_list: - self._event_fetch_ongoing -= 1 - return - - event_list = self._event_fetch_list - self._event_fetch_list = [] - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - - with self._new_transaction(conn, "do_fetch", []) as txn: - rows = self._fetch_event_rows(txn, event_ids) - - row_dict = { - r["event_id"]: r - for r in rows - } + logger.debug("do_fetch getting lock") + with self._event_fetch_lock: + logger.debug("do_fetch go lock: %r", self._event_fetch_list) + event_list = self._event_fetch_list + self._event_fetch_list = [] + if not event_list: + self._event_fetch_ongoing -= 1 + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + + rows = self._new_transaction( + conn, "do_fetch", [], self._fetch_event_rows, event_ids + ) - for ids, d in event_list: - def fire(): - if not d.called: + row_dict = { + r["event_id"]: r + for r in rows + } + + logger.debug("do_fetch got events: %r", row_dict.keys()) + + def fire(evs): + for ids, d in evs: + if not d.called: + try: d.callback( [ row_dict[i] @@ -540,32 +540,51 @@ class EventsStore(SQLBaseStore): if i in row_dict ] ) - reactor.callFromThread(fire) - except Exception as e: - logger.exception("do_fetch") - for _, d in event_list: - if not d.called: - reactor.callFromThread(d.errback, e) + except: + logger.exception("Failed to callback") + reactor.callFromThread(fire, event_list) + except Exception as e: + logger.exception("do_fetch") - with self._event_fetch_lock: - self._event_fetch_ongoing -= 1 - return + def fire(evs): + for _, d in evs: + if not d.called: + d.errback(e) + + if event_list: + reactor.callFromThread(fire, event_list) + + @defer.inlineCallbacks + def _enqueue_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) events_d = defer.Deferred() - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, events_d) - ) + try: + logger.debug("enqueueueueue getting lock") + with self._event_fetch_lock: + logger.debug("enqueue go lock") + self._event_fetch_list.append( + (events, events_d) + ) - self._event_fetch_lock.notify_all() + self._event_fetch_ongoing += 1 - # if self._event_fetch_ongoing < 5: - self._event_fetch_ongoing += 1 self.runWithConnection( - do_fetch + self._do_fetch ) - rows = yield events_d + except Exception as e: + if not events_d.called: + events_d.errback(e) + + logger.debug("events_d before") + try: + rows = yield events_d + except: + logger.exception("events_d") + logger.debug("events_d after") res = yield defer.gatherResults( [ @@ -580,6 +599,7 @@ class EventsStore(SQLBaseStore): ], consumeErrors=True ) + logger.debug("gatherResults after") defer.returnValue({ e.event_id: e @@ -639,7 +659,8 @@ class EventsStore(SQLBaseStore): rejected_reason=row["rejects"], ) for row in rows - ] + ], + consumeErrors=True, ) defer.returnValue({ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d16b57c515..af45fc5619 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -357,10 +357,12 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + logger.debug("stream before") events = yield self._get_events( [r["event_id"] for r in rows], get_prev_content=True ) + logger.debug("stream after") self._set_before_and_after(events, rows) diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 8c348ecc95..8573f18b55 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -33,8 +33,9 @@ class SQLBaseStoreTestCase(unittest.TestCase): def setUp(self): self.db_pool = Mock(spec=["runInteraction"]) self.mock_txn = Mock() - self.mock_conn = Mock(spec_set=["cursor"]) + self.mock_conn = Mock(spec_set=["cursor", "rollback", "commit"]) self.mock_conn.cursor.return_value = self.mock_txn + self.mock_conn.rollback.return_value = None # Our fake runInteraction just runs synchronously inline def runInteraction(func, *args, **kwargs): -- cgit 1.4.1 From de01438a578c285b632a4c791a56bebfe29fb06b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:00:50 +0100 Subject: Sort out error handling --- synapse/storage/events.py | 47 ++++++++++++++++++++++------------------------- 1 file changed, 22 insertions(+), 25 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 260bdf0ec4..f2c181dde7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -502,8 +502,8 @@ class EventsStore(SQLBaseStore): def _do_fetch(self, conn): event_list = [] - try: - while True: + while True: + try: logger.debug("do_fetch getting lock") with self._event_fetch_lock: logger.debug("do_fetch go lock: %r", self._event_fetch_list) @@ -543,16 +543,16 @@ class EventsStore(SQLBaseStore): except: logger.exception("Failed to callback") reactor.callFromThread(fire, event_list) - except Exception as e: - logger.exception("do_fetch") + except Exception as e: + logger.exception("do_fetch") - def fire(evs): - for _, d in evs: - if not d.called: - d.errback(e) + def fire(evs): + for _, d in evs: + if not d.called: + d.errback(e) - if event_list: - reactor.callFromThread(fire, event_list) + if event_list: + reactor.callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, @@ -561,29 +561,26 @@ class EventsStore(SQLBaseStore): defer.returnValue({}) events_d = defer.Deferred() - try: - logger.debug("enqueueueueue getting lock") - with self._event_fetch_lock: - logger.debug("enqueue go lock") - self._event_fetch_list.append( - (events, events_d) - ) + logger.debug("enqueueueueue getting lock") + with self._event_fetch_lock: + logger.debug("enqueue go lock") + self._event_fetch_list.append( + (events, events_d) + ) + if self._event_fetch_ongoing < 1: self._event_fetch_ongoing += 1 + should_start = True + else: + should_start = False + if should_start: self.runWithConnection( self._do_fetch ) - except Exception as e: - if not events_d.called: - events_d.errback(e) - logger.debug("events_d before") - try: - rows = yield events_d - except: - logger.exception("events_d") + rows = yield events_d logger.debug("events_d after") res = yield defer.gatherResults( -- cgit 1.4.1 From 575ec91d82e6b283bd21471c5a874de968c97bff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:15:10 +0100 Subject: Correctly pass through params --- synapse/storage/events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f2c181dde7..143c24b10d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -91,9 +91,9 @@ class EventsStore(SQLBaseStore): """ events = yield self._get_events( [event_id], - check_redacted=True, - get_prev_content=False, - allow_rejected=False, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, ) if not events and not allow_none: -- cgit 1.4.1 From 372d4c6d7b38f89fb79509cf432915d96bdc8164 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:26:00 +0100 Subject: Srsly. Don't use closures. Baaaaaad --- synapse/storage/events.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 143c24b10d..2c3e6d5a5c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -529,20 +529,18 @@ class EventsStore(SQLBaseStore): logger.debug("do_fetch got events: %r", row_dict.keys()) - def fire(evs): - for ids, d in evs: + def fire(lst, res): + for ids, d in lst: if not d.called: try: - d.callback( - [ - row_dict[i] - for i in ids - if i in row_dict - ] - ) + d.callback([ + res[i] + for i in ids + if i in res + ]) except: logger.exception("Failed to callback") - reactor.callFromThread(fire, event_list) + reactor.callFromThread(fire, event_list, row_dict) except Exception as e: logger.exception("do_fetch") -- cgit 1.4.1 From aa32bd38e40cd8d69406fe74581290ad7fe34f35 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:35:04 +0100 Subject: Add a wait --- synapse/storage/_base.py | 2 +- synapse/storage/events.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0df1b46edc..5d86aa5cd4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,7 +301,7 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._event_fetch_lock = threading.Lock() + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2c3e6d5a5c..f694b877f4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -502,6 +502,7 @@ class EventsStore(SQLBaseStore): def _do_fetch(self, conn): event_list = [] + i = 0 while True: try: logger.debug("do_fetch getting lock") @@ -510,8 +511,14 @@ class EventsStore(SQLBaseStore): event_list = self._event_fetch_list self._event_fetch_list = [] if not event_list: - self._event_fetch_ongoing -= 1 - return + if self.database_engine.single_threaded or i > 5: + self._event_fetch_ongoing -= 1 + return + else: + self._event_fetch_lock.wait(0.1) + i += 1 + continue + i = 0 event_id_lists = zip(*event_list)[0] event_ids = [ @@ -566,6 +573,8 @@ class EventsStore(SQLBaseStore): (events, events_d) ) + self._event_fetch_lock.notify_all() + if self._event_fetch_ongoing < 1: self._event_fetch_ongoing += 1 should_start = True -- cgit 1.4.1 From e275a9c0d9365f77cb06f2a04c55f591ac5b54c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:54:51 +0100 Subject: preserve log context --- synapse/storage/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f694b877f4..c621516039 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -21,6 +21,7 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util import unwrap_deferred +from synapse.util.logcontext import preserve_context_over_deferred from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from synapse.crypto.event_signing import compute_event_reference_hash @@ -587,7 +588,7 @@ class EventsStore(SQLBaseStore): ) logger.debug("events_d before") - rows = yield events_d + rows = yield preserve_context_over_deferred(events_d) logger.debug("events_d after") res = yield defer.gatherResults( -- cgit 1.4.1 From 0f29cfabc3177c149a4c34fc05398b8d9f3a06ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 14:06:42 +0100 Subject: Remove debug logging --- synapse/storage/events.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c621516039..716f10386f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -506,13 +506,11 @@ class EventsStore(SQLBaseStore): i = 0 while True: try: - logger.debug("do_fetch getting lock") with self._event_fetch_lock: - logger.debug("do_fetch go lock: %r", self._event_fetch_list) event_list = self._event_fetch_list self._event_fetch_list = [] if not event_list: - if self.database_engine.single_threaded or i > 5: + if self.database_engine.single_threaded or i > 3: self._event_fetch_ongoing -= 1 return else: @@ -535,8 +533,6 @@ class EventsStore(SQLBaseStore): for r in rows } - logger.debug("do_fetch got events: %r", row_dict.keys()) - def fire(lst, res): for ids, d in lst: if not d.called: @@ -567,16 +563,14 @@ class EventsStore(SQLBaseStore): defer.returnValue({}) events_d = defer.Deferred() - logger.debug("enqueueueueue getting lock") with self._event_fetch_lock: - logger.debug("enqueue go lock") self._event_fetch_list.append( (events, events_d) ) - self._event_fetch_lock.notify_all() + self._event_fetch_lock.notify() - if self._event_fetch_ongoing < 1: + if self._event_fetch_ongoing < 3: self._event_fetch_ongoing += 1 should_start = True else: @@ -587,9 +581,7 @@ class EventsStore(SQLBaseStore): self._do_fetch ) - logger.debug("events_d before") rows = yield preserve_context_over_deferred(events_d) - logger.debug("events_d after") res = yield defer.gatherResults( [ -- cgit 1.4.1 From d62dee7eae741034abfd050982b1cfc4e2181258 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 15:06:37 +0100 Subject: Remove more debug logging --- synapse/storage/events.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 716f10386f..3cf2f7cff8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -421,14 +421,12 @@ class EventsStore(SQLBaseStore): ]) if not txn: - logger.debug("enqueue before") missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) - logger.debug("enqueue after") else: missing_events = self._fetch_events_txn( txn, -- cgit 1.4.1 From acb12cc811d7ce7cb3c5b6544ed28f7d6592ef33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 15:20:05 +0100 Subject: Make store.get_current_state fetch events asyncly --- synapse/storage/events.py | 1 - synapse/storage/state.py | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3cf2f7cff8..066e1aab75 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -594,7 +594,6 @@ class EventsStore(SQLBaseStore): ], consumeErrors=True ) - logger.debug("gatherResults after") defer.returnValue({ e.event_id: e diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 3f5642642d..b3f2a4dfa1 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -159,11 +159,12 @@ class StateStore(SQLBaseStore): args = (room_id, ) txn.execute(sql, args) - results = self.cursor_to_dict(txn) + results = txn.fetchall() - return self._parse_events_txn(txn, results) + return [r[0] for r in results] - events = yield self.runInteraction("get_current_state", f) + event_ids = yield self.runInteraction("get_current_state", f) + events = yield self._get_events(event_ids) defer.returnValue(events) -- cgit 1.4.1 From 70f272f71ca399205a72deb29b0f86ff3bf23618 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 16:34:17 +0100 Subject: Don't completely drain the list --- synapse/storage/events.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 066e1aab75..6ee2e9a485 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -505,8 +505,15 @@ class EventsStore(SQLBaseStore): while True: try: with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] + tot = 0 + for j, lst in enumerate(self._event_fetch_list): + if tot > 200: + break + tot += len(lst[0]) + + event_list = self._event_fetch_list[:j+1] + self._event_fetch_list = self._event_fetch_list[j+1:] + if not event_list: if self.database_engine.single_threaded or i > 3: self._event_fetch_ongoing -= 1 -- cgit 1.4.1 From 9ff7f66a2be6d3f0542b69e784b400df349869ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 16:36:03 +0100 Subject: init j --- synapse/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6ee2e9a485..f21364606d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -506,6 +506,7 @@ class EventsStore(SQLBaseStore): try: with self._event_fetch_lock: tot = 0 + j = 0 for j, lst in enumerate(self._event_fetch_list): if tot > 200: break -- cgit 1.4.1 From 6c74fd62a0b2292b4fce9d59ea9790685e256e0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 16:45:35 +0100 Subject: Revert limiting of fetching, it didn't help perf. --- synapse/storage/events.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f21364606d..98a5663932 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -505,15 +505,8 @@ class EventsStore(SQLBaseStore): while True: try: with self._event_fetch_lock: - tot = 0 - j = 0 - for j, lst in enumerate(self._event_fetch_list): - if tot > 200: - break - tot += len(lst[0]) - - event_list = self._event_fetch_list[:j+1] - self._event_fetch_list = self._event_fetch_list[j+1:] + event_list = self._event_fetch_list + self._event_fetch_list = [] if not event_list: if self.database_engine.single_threaded or i > 3: -- cgit 1.4.1 From c3b37abdfd6d34ea7ff7698bd61dd37763e257b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 16:59:58 +0100 Subject: PEP8 --- synapse/storage/_base.py | 1 - synapse/storage/events.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5d86aa5cd4..b529e0543e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -27,7 +27,6 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict -import contextlib import functools import sys import time diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 98a5663932..25fb49a28d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -629,7 +629,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + get_prev_content=False, allow_rejected=False): if not events: defer.returnValue({}) -- cgit 1.4.1 From 10f1bdb9a27abe34c8b022c32209a4abfc8cb443 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 10:21:40 +0100 Subject: Move get_events functions to storage.events --- synapse/storage/_base.py | 155 ---------------------------------------------- synapse/storage/events.py | 132 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 155 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 81052409b7..ec80169c5b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,8 +15,6 @@ import logging from synapse.api.errors import StoreError -from synapse.events import FrozenEvent -from synapse.events.utils import prune_event from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache @@ -28,7 +26,6 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict import functools -import simplejson as json import sys import time import threading @@ -867,158 +864,6 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) - def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False): - return self.runInteraction( - "_get_events", self._get_events_txn, event_ids, - check_redacted=check_redacted, get_prev_content=get_prev_content, - ) - - def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False): - if not event_ids: - return [] - - events = [ - self._get_event_txn( - txn, event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content - ) - for event_id in event_ids - ] - - return [e for e in events if e] - - def _invalidate_get_event_cache(self, event_id): - for check_redacted in (False, True): - for get_prev_content in (False, True): - self._get_event_cache.invalidate(event_id, check_redacted, - get_prev_content) - - def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) - - if allow_rejected or not ret.rejected_reason: - return ret - else: - return None - except KeyError: - pass - finally: - start_time = update_counter("event_cache", start_time) - - sql = ( - "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " - "FROM event_json as e " - "LEFT JOIN redactions as r ON e.event_id = r.redacts " - "LEFT JOIN rejections as rej on rej.event_id = e.event_id " - "WHERE e.event_id = ? " - "LIMIT 1 " - ) - - txn.execute(sql, (event_id,)) - - res = txn.fetchone() - - if not res: - return None - - internal_metadata, js, redacted, rejected_reason = res - - start_time = update_counter("select_event", start_time) - - result = self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=rejected_reason, - ) - self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) - - if allow_rejected or not rejected_reason: - return result - else: - return None - - def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - d = json.loads(js) - start_time = update_counter("decode_json", start_time) - - internal_metadata = json.loads(internal_metadata) - start_time = update_counter("decode_internal", start_time) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - start_time = update_counter("build_frozen_event", start_time) - - if check_redacted and redacted: - ev = prune_event(ev) - - ev.unsigned["redacted_by"] = redacted - # Get the redaction event. - - because = self._get_event_txn( - txn, - redacted, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - start_time = update_counter("redact_event", start_time) - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = self._get_event_txn( - txn, - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] - start_time = update_counter("get_prev_content", start_time) - - return ev - - def _parse_events(self, rows): - return self.runInteraction( - "_parse_events", self._parse_events_txn, rows - ) - - def _parse_events_txn(self, txn, rows): - event_ids = [r["event_id"] for r in rows] - - return self._get_events_txn(txn, event_ids) - - def _has_been_redacted_txn(self, txn, event): - sql = "SELECT event_id FROM redactions WHERE redacts = ?" - txn.execute(sql, (event.event_id,)) - result = txn.fetchone() - return result[0] if result else None - def get_next_stream_id(self): with self._next_stream_id_lock: i = self._next_stream_id diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9242b0a84e..afdf0f7193 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,6 +17,9 @@ from _base import SQLBaseStore, _RollbackButIsFineException from twisted.internet import defer +from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from synapse.crypto.event_signing import compute_event_reference_hash @@ -26,6 +29,7 @@ from syutil.jsonutil import encode_canonical_json from contextlib import contextmanager import logging +import simplejson as json logger = logging.getLogger(__name__) @@ -393,3 +397,131 @@ class EventsStore(SQLBaseStore): return self.runInteraction( "have_events", f, ) + + def _get_events(self, event_ids, check_redacted=True, + get_prev_content=False): + return self.runInteraction( + "_get_events", self._get_events_txn, event_ids, + check_redacted=check_redacted, get_prev_content=get_prev_content, + ) + + def _get_events_txn(self, txn, event_ids, check_redacted=True, + get_prev_content=False): + if not event_ids: + return [] + + events = [ + self._get_event_txn( + txn, event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content + ) + for event_id in event_ids + ] + + return [e for e in events if e] + + def _invalidate_get_event_cache(self, event_id): + for check_redacted in (False, True): + for get_prev_content in (False, True): + self._get_event_cache.invalidate(event_id, check_redacted, + get_prev_content) + + def _get_event_txn(self, txn, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False): + + try: + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + + if allow_rejected or not ret.rejected_reason: + return ret + else: + return None + except KeyError: + pass + + sql = ( + "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + "FROM event_json as e " + "LEFT JOIN redactions as r ON e.event_id = r.redacts " + "LEFT JOIN rejections as rej on rej.event_id = e.event_id " + "WHERE e.event_id = ? " + "LIMIT 1 " + ) + + txn.execute(sql, (event_id,)) + + res = txn.fetchone() + + if not res: + return None + + internal_metadata, js, redacted, rejected_reason = res + + result = self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=rejected_reason, + ) + self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) + + if allow_rejected or not rejected_reason: + return result + else: + return None + + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): + + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + + if check_redacted and redacted: + ev = prune_event(ev) + + ev.unsigned["redacted_by"] = redacted + # Get the redaction event. + + because = self._get_event_txn( + txn, + redacted, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = self._get_event_txn( + txn, + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + + return ev + + def _parse_events(self, rows): + return self.runInteraction( + "_parse_events", self._parse_events_txn, rows + ) + + def _parse_events_txn(self, txn, rows): + event_ids = [r["event_id"] for r in rows] + + return self._get_events_txn(txn, event_ids) + + def _has_been_redacted_txn(self, txn, event): + sql = "SELECT event_id FROM redactions WHERE redacts = ?" + txn.execute(sql, (event.event_id,)) + result = txn.fetchone() + return result[0] if result else None -- cgit 1.4.1 From 4d1b6f4ad1bbae71c6c941b5951d9f9043ddace4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 14:03:46 +0100 Subject: Remove rejected events if we don't want rejected events --- synapse/storage/events.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 25fb49a28d..5a04d45e1e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -643,6 +643,9 @@ class EventsStore(SQLBaseStore): events, ) + if not allow_rejected: + rows[:] = [r for r in rows if not r["rejects"]] + res = yield defer.gatherResults( [ defer.maybeDeferred( -- cgit 1.4.1 From 165eb2dbe63a42ef5fdd947544e8d7fda0e7234f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 15:18:41 +0100 Subject: Comments and shuffle of functions --- synapse/storage/events.py | 80 +++++++++++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 38 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a04d45e1e..9751f024d7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -402,6 +402,10 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False, txn=None): + """Gets a collection of events. If `txn` is not None the we use the + current transaction to fetch events and we return a deferred that is + guarenteed to have resolved. + """ if not event_ids: defer.returnValue([]) @@ -490,16 +494,10 @@ class EventsStore(SQLBaseStore): return event_map - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - return unwrap_deferred(self._fetch_events( - txn, events, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - )) - def _do_fetch(self, conn): + """Takes a database connection and waits for requests for events from + the _event_fetch_list queue. + """ event_list = [] i = 0 while True: @@ -532,6 +530,7 @@ class EventsStore(SQLBaseStore): for r in rows } + # We only want to resolve deferreds from the main thread def fire(lst, res): for ids, d in lst: if not d.called: @@ -547,6 +546,7 @@ class EventsStore(SQLBaseStore): except Exception as e: logger.exception("do_fetch") + # We only want to resolve deferreds from the main thread def fire(evs): for _, d in evs: if not d.called: @@ -558,6 +558,10 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, get_prev_content=False, allow_rejected=False): + """Fetches events from the database using the _event_fetch_list. This + allows batch and bulk fetching of events - it allows us to fetch events + without having to create a new transaction for each request for events. + """ if not events: defer.returnValue({}) @@ -582,6 +586,9 @@ class EventsStore(SQLBaseStore): rows = yield preserve_context_over_deferred(events_d) + if not allow_rejected: + rows[:] = [r for r in rows if not r["rejects"]] + res = yield defer.gatherResults( [ self._get_event_from_row( @@ -627,49 +634,46 @@ class EventsStore(SQLBaseStore): return rows - @defer.inlineCallbacks - def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + def _fetch_events_txn(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): if not events: - defer.returnValue({}) + return {} - if txn: - rows = self._fetch_event_rows( - txn, events, - ) - else: - rows = yield self.runInteraction( - self._fetch_event_rows, - events, - ) + rows = self._fetch_event_rows( + txn, events, + ) if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( - [ - defer.maybeDeferred( - self._get_event_from_row, - txn, - row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row["rejects"], - ) - for row in rows - ], - consumeErrors=True, - ) + res = [ + unwrap_deferred(self._get_event_from_row( + txn, + row["internal_metadata"], row["json"], row["redacts"], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row["rejects"], + )) + for row in rows + ] - defer.returnValue({ + return { r.event_id: r for r in res - }) + } @defer.inlineCallbacks def _get_event_from_row(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): + """This is called when we have a row from the database that we want to + convert into an event. Depending on the given options it may do more + database ops to fill in extra information (e.g. previous content or + rejection reason.) + + `txn` may be None, and if so this creates new transactions for each + database op. + """ d = json.loads(js) internal_metadata = json.loads(internal_metadata) -- cgit 1.4.1 From 227f8ef03129a3eb9fe4e4a3f21e397b11032836 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 13:00:57 +0100 Subject: Split out _get_event_from_row back into defer and _txn version --- synapse/storage/events.py | 68 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 11 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9751f024d7..b0d474b8e4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -592,7 +592,6 @@ class EventsStore(SQLBaseStore): res = yield defer.gatherResults( [ self._get_event_from_row( - None, row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, @@ -647,13 +646,13 @@ class EventsStore(SQLBaseStore): rows[:] = [r for r in rows if not r["rejects"]] res = [ - unwrap_deferred(self._get_event_from_row( + self._get_event_from_row_txn( txn, row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, rejected_reason=row["rejects"], - )) + ) for row in rows ] @@ -663,17 +662,64 @@ class EventsStore(SQLBaseStore): } @defer.inlineCallbacks - def _get_event_from_row(self, txn, internal_metadata, js, redacted, + def _get_event_from_row(self, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): - """This is called when we have a row from the database that we want to - convert into an event. Depending on the given options it may do more - database ops to fill in extra information (e.g. previous content or - rejection reason.) + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) - `txn` may be None, and if so this creates new transactions for each - database op. - """ + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + desc="_get_event_from_row", + ) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + + if check_redacted and redacted: + ev = prune_event(ev) + + redaction_id = yield self._simple_select_one_onecol( + table="redactions", + keyvalues={"redacts": ev.event_id}, + retcol="event_id", + desc="_get_event_from_row", + ) + + ev.unsigned["redacted_by"] = redaction_id + # Get the redaction event. + + because = yield self.get_event( + redaction_id, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = yield self.get_event( + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + + self._get_event_cache.prefill( + ev.event_id, check_redacted, get_prev_content, ev + ) + + defer.returnValue(ev) + + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): d = json.loads(js) internal_metadata = json.loads(internal_metadata) -- cgit 1.4.1 From f407cbd2f1f034ee2f7be8a9464aa234266420cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 13:02:01 +0100 Subject: PEP8 --- synapse/storage/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b0d474b8e4..3f7f546bdd 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -718,8 +718,8 @@ class EventsStore(SQLBaseStore): defer.returnValue(ev) def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): + check_redacted=True, get_prev_content=False, + rejected_reason=None): d = json.loads(js) internal_metadata = json.loads(internal_metadata) -- cgit 1.4.1 From ab45e12d31bf8238c3cb1887d18d89ae2addbb7a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 13:07:19 +0100 Subject: Make not return a deferred _get_event_from_row_txn --- synapse/storage/events.py | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3f7f546bdd..4aa4e7ab15 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -723,23 +723,8 @@ class EventsStore(SQLBaseStore): d = json.loads(js) internal_metadata = json.loads(internal_metadata) - def select(txn, *args, **kwargs): - if txn: - return self._simple_select_one_onecol_txn(txn, *args, **kwargs) - else: - return self._simple_select_one_onecol( - *args, - desc="_get_event_from_row", **kwargs - ) - - def get_event(txn, *args, **kwargs): - if txn: - return self._get_event_txn(txn, *args, **kwargs) - else: - return self.get_event(*args, **kwargs) - if rejected_reason: - rejected_reason = yield select( + rejected_reason = self._simple_select_one_onecol_txn( txn, table="rejections", keyvalues={"event_id": rejected_reason}, @@ -755,7 +740,7 @@ class EventsStore(SQLBaseStore): if check_redacted and redacted: ev = prune_event(ev) - redaction_id = yield select( + redaction_id = self._simple_select_one_onecol_txn( txn, table="redactions", keyvalues={"redacts": ev.event_id}, @@ -765,7 +750,7 @@ class EventsStore(SQLBaseStore): ev.unsigned["redacted_by"] = redaction_id # Get the redaction event. - because = yield get_event( + because = self._get_event_txn( txn, redaction_id, check_redacted=False @@ -775,7 +760,7 @@ class EventsStore(SQLBaseStore): ev.unsigned["redacted_because"] = because if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield get_event( + prev = self._get_event_txn( txn, ev.unsigned["replaces_state"], get_prev_content=False, @@ -787,7 +772,7 @@ class EventsStore(SQLBaseStore): ev.event_id, check_redacted, get_prev_content, ev ) - defer.returnValue(ev) + return ev def _parse_events(self, rows): return self.runInteraction( -- cgit 1.4.1 From 9118a928621ae31a951412b867179f5e0a627976 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 13:27:16 +0100 Subject: Split up _get_events into defer and txn versions --- synapse/storage/events.py | 59 ++++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 24 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4aa4e7ab15..656e57b5c6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,6 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent from synapse.events.utils import prune_event -from synapse.util import unwrap_deferred from synapse.util.logcontext import preserve_context_over_deferred from synapse.util.logutils import log_function @@ -401,11 +400,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False, txn=None): - """Gets a collection of events. If `txn` is not None the we use the - current transaction to fetch events and we return a deferred that is - guarenteed to have resolved. - """ + get_prev_content=False, allow_rejected=False): if not event_ids: defer.returnValue([]) @@ -424,21 +419,12 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - if not txn: - missing_events = yield self._enqueue_events( - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - else: - missing_events = self._fetch_events_txn( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + missing_events = yield self._enqueue_events( + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) event_map.update(missing_events) @@ -449,13 +435,38 @@ class EventsStore(SQLBaseStore): def _get_events_txn(self, txn, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False): - return unwrap_deferred(self._get_events( + if not event_ids: + return [] + + event_map = self._get_events_from_cache( event_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, - txn=txn, - )) + ) + + missing_events_ids = [e for e in event_ids if e not in event_map] + + if not missing_events_ids: + return [ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ] + + missing_events = self._fetch_events_txn( + txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + event_map.update(missing_events) + + return [ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ] def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): -- cgit 1.4.1 From 80a167b1f032ae9bcb75bdb20c2112a779dda516 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 11:18:22 +0100 Subject: Add comments --- synapse/storage/events.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 656e57b5c6..3d798f5fa2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -35,6 +35,16 @@ import simplejson as json logger = logging.getLogger(__name__) +# These values are used in the `enqueus_event` and `_do_fetch` methods to +# control how we batch/bulk fetch events from the database. +# The values are plucked out of thing air to make initial sync run faster +# on jki.re +# TODO: Make these configurable. +EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events +EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for events +EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for events + + class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function @@ -518,11 +528,12 @@ class EventsStore(SQLBaseStore): self._event_fetch_list = [] if not event_list: - if self.database_engine.single_threaded or i > 3: + single_threaded = self.database_engine.single_threaded + if single_threaded or i > EVENT_QUEUE_ITERATIONS: self._event_fetch_ongoing -= 1 return else: - self._event_fetch_lock.wait(0.1) + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) i += 1 continue i = 0 @@ -584,7 +595,7 @@ class EventsStore(SQLBaseStore): self._event_fetch_lock.notify() - if self._event_fetch_ongoing < 3: + if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: self._event_fetch_ongoing += 1 should_start = True else: -- cgit 1.4.1 From ac5f2bf9db9f37d2cb076cfa3a0912ecb948f508 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 14:50:57 +0100 Subject: s/for events/for requests for events/ --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3d798f5fa2..84a799bcff 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -42,7 +42,7 @@ logger = logging.getLogger(__name__) # TODO: Make these configurable. EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for events -EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for events +EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events class EventsStore(SQLBaseStore): -- cgit 1.4.1 From 27e4b45c0692489a11e4be2b05f3fcb99b3c7489 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 14:52:23 +0100 Subject: s/for events/for requests for events/ --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 84a799bcff..74946eac3e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -41,7 +41,7 @@ logger = logging.getLogger(__name__) # on jki.re # TODO: Make these configurable. EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events -EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for events +EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events -- cgit 1.4.1