diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 26 | ||||
-rw-r--r-- | synapse/storage/events.py | 125 | ||||
-rw-r--r-- | synapse/storage/stream.py | 2 |
3 files changed, 85 insertions, 68 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ceff99c16d..0df1b46edc 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,10 +301,12 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._event_fetch_lock = threading.Condition() + self._event_fetch_lock = threading.Lock() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + self._pending_ds = [] + self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator() @@ -344,8 +346,7 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - @contextlib.contextmanager - def _new_transaction(self, conn, desc, after_callbacks): + def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -366,6 +367,9 @@ class SQLBaseStore(object): txn = LoggingTransaction( txn, name, self.database_engine, after_callbacks ) + r = func(txn, *args, **kwargs) + conn.commit() + return r except self.database_engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -398,17 +402,6 @@ class SQLBaseStore(object): ) continue raise - - try: - yield txn - conn.commit() - return - except: - try: - conn.rollback() - except: - pass - raise except Exception as e: logger.debug("[TXN FAIL] {%s} %s", name, e) raise @@ -440,8 +433,9 @@ class SQLBaseStore(object): conn.reconnect() current_context.copy_to(context) - with self._new_transaction(conn, desc, after_callbacks) as txn: - return func(txn, *args, **kwargs) + return self._new_transaction( + conn, desc, after_callbacks, func, *args, **kwargs + ) result = yield preserve_context_over_fn( self._db_pool.runWithConnection, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b4abd83260..260bdf0ec4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -420,12 +420,14 @@ class EventsStore(SQLBaseStore): ]) if not txn: + logger.debug("enqueue before") missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) + logger.debug("enqueue after") else: missing_events = self._fetch_events_txn( txn, @@ -498,41 +500,39 @@ class EventsStore(SQLBaseStore): allow_rejected=allow_rejected, )) - @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - defer.returnValue({}) - - def do_fetch(conn): - event_list = [] + def _do_fetch(self, conn): + event_list = [] + try: while True: - try: - with self._event_fetch_lock: - i = 0 - while not self._event_fetch_list: - self._event_fetch_ongoing -= 1 - return - - event_list = self._event_fetch_list - self._event_fetch_list = [] - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - - with self._new_transaction(conn, "do_fetch", []) as txn: - rows = self._fetch_event_rows(txn, event_ids) - - row_dict = { - r["event_id"]: r - for r in rows - } + logger.debug("do_fetch getting lock") + with self._event_fetch_lock: + logger.debug("do_fetch go lock: %r", self._event_fetch_list) + event_list = self._event_fetch_list + self._event_fetch_list = [] + if not event_list: + self._event_fetch_ongoing -= 1 + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + + rows = self._new_transaction( + conn, "do_fetch", [], self._fetch_event_rows, event_ids + ) - for ids, d in event_list: - def fire(): - if not d.called: + row_dict = { + r["event_id"]: r + for r in rows + } + + logger.debug("do_fetch got events: %r", row_dict.keys()) + + def fire(evs): + for ids, d in evs: + if not d.called: + try: d.callback( [ row_dict[i] @@ -540,32 +540,51 @@ class EventsStore(SQLBaseStore): if i in row_dict ] ) - reactor.callFromThread(fire) - except Exception as e: - logger.exception("do_fetch") - for _, d in event_list: - if not d.called: - reactor.callFromThread(d.errback, e) + except: + logger.exception("Failed to callback") + reactor.callFromThread(fire, event_list) + except Exception as e: + logger.exception("do_fetch") - with self._event_fetch_lock: - self._event_fetch_ongoing -= 1 - return + def fire(evs): + for _, d in evs: + if not d.called: + d.errback(e) + + if event_list: + reactor.callFromThread(fire, event_list) + + @defer.inlineCallbacks + def _enqueue_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) events_d = defer.Deferred() - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, events_d) - ) + try: + logger.debug("enqueueueueue getting lock") + with self._event_fetch_lock: + logger.debug("enqueue go lock") + self._event_fetch_list.append( + (events, events_d) + ) - self._event_fetch_lock.notify_all() + self._event_fetch_ongoing += 1 - # if self._event_fetch_ongoing < 5: - self._event_fetch_ongoing += 1 self.runWithConnection( - do_fetch + self._do_fetch ) - rows = yield events_d + except Exception as e: + if not events_d.called: + events_d.errback(e) + + logger.debug("events_d before") + try: + rows = yield events_d + except: + logger.exception("events_d") + logger.debug("events_d after") res = yield defer.gatherResults( [ @@ -580,6 +599,7 @@ class EventsStore(SQLBaseStore): ], consumeErrors=True ) + logger.debug("gatherResults after") defer.returnValue({ e.event_id: e @@ -639,7 +659,8 @@ class EventsStore(SQLBaseStore): rejected_reason=row["rejects"], ) for row in rows - ] + ], + consumeErrors=True, ) defer.returnValue({ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d16b57c515..af45fc5619 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -357,10 +357,12 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + logger.debug("stream before") events = yield self._get_events( [r["event_id"] for r in rows], get_prev_content=True ) + logger.debug("stream after") self._set_before_and_after(events, rows) |