From 476899295f5fd6cff64799bcbc84cd4bf9005e33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 16:32:18 +0100 Subject: Change the way we do logging contexts so that they survive divergences --- synapse/federation/federation_server.py | 46 ++++++++++++++++----------------- 1 file changed, 22 insertions(+), 24 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2b46188c91..cd79e23f4b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -20,7 +20,6 @@ from .federation_base import FederationBase from .units import Transaction, Edu from synapse.util.logutils import log_function -from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent import synapse.metrics @@ -123,29 +122,28 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - with PreserveLoggingContext(): - results = [] - - for pdu in pdu_list: - d = self._handle_new_pdu(transaction.origin, pdu) - - try: - yield d - results.append({}) - except FederationError as e: - self.send_failure(e, transaction.origin) - results.append({"error": str(e)}) - except Exception as e: - results.append({"error": str(e)}) - logger.exception("Failed to handle PDU") - - if hasattr(transaction, "edus"): - for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu( - transaction.origin, - edu.edu_type, - edu.content - ) + results = [] + + for pdu in pdu_list: + d = self._handle_new_pdu(transaction.origin, pdu) + + try: + yield d + results.append({}) + except FederationError as e: + self.send_failure(e, transaction.origin) + results.append({"error": str(e)}) + except Exception as e: + results.append({"error": str(e)}) + logger.exception("Failed to handle PDU") + + if hasattr(transaction, "edus"): + for edu in [Edu(**x) for x in transaction.edus]: + self.received_edu( + transaction.origin, + edu.edu_type, + edu.content + ) for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) -- cgit 1.5.1 From 95dedb866f04ee4ae034c35130f2a8dc86243fbb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 13:14:29 +0100 Subject: Unwrap defer.gatherResults failures --- synapse/federation/federation_base.py | 4 +++- synapse/handlers/federation.py | 3 ++- synapse/handlers/message.py | 5 +++-- synapse/handlers/profile.py | 3 ++- synapse/handlers/room.py | 4 ++-- synapse/util/__init__.py | 6 ++++++ 6 files changed, 18 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 21a763214b..5217d91aab 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -24,6 +24,8 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError +from synapse.util import unwrapFirstError + import logging @@ -94,7 +96,7 @@ class FederationBase(object): yield defer.gatherResults( [do(pdu) for pdu in pdus], consumeErrors=True - ) + ).addErrback(unwrapFirstError) defer.returnValue(signed_pdus) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 77c315c47c..cd85001578 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,6 +21,7 @@ from synapse.api.errors import ( AuthError, FederationError, StoreError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor @@ -926,7 +927,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22e19af17f..b7d52647d7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,6 +20,7 @@ from synapse.api.errors import RoomError, SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID @@ -303,7 +304,7 @@ class MessageHandler(BaseHandler): event.room_id ), ] - ) + ).addErrback(unwrapFirstError) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) @@ -328,7 +329,7 @@ class MessageHandler(BaseHandler): yield defer.gatherResults( [handle_room(e) for e in room_list], consumeErrors=True - ) + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index ffb449d457..71ff78ab23 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import EventTypes, Membership from synapse.types import UserID +from synapse.util import unwrapFirstError from ._base import BaseHandler @@ -159,7 +160,7 @@ class ProfileHandler(BaseHandler): self.store.get_profile_avatar_url(user.localpart), ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cfa2e38ed2..ea5abba6ab 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -21,7 +21,7 @@ from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import StoreError, SynapseError -from synapse.util import stringutils +from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event @@ -537,7 +537,7 @@ class RoomListHandler(BaseHandler): for room in chunk ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) for i, room in enumerate(chunk): room["num_joined_members"] = len(results[i]) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index fd3eb1f574..c1a16b639a 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -23,6 +23,12 @@ import logging logger = logging.getLogger(__name__) +def unwrapFirstError(failure): + # defer.gatherResults and DeferredLists wrap failures. + failure.trap(defer.FirstError) + return failure.value.subFailure + + class Clock(object): """A small utility that obtains current time-of-day so that time may be mocked during unit-tests. -- cgit 1.5.1 From a2c4f3f150f63c720370f6882da804c8ac20fd69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 10:54:04 +0100 Subject: Fix daedlock --- synapse/federation/federation_client.py | 15 +++- synapse/federation/federation_server.py | 2 + synapse/handlers/message.py | 33 ++++++--- synapse/storage/_base.py | 26 +++---- synapse/storage/events.py | 125 +++++++++++++++++++------------- synapse/storage/stream.py | 2 + tests/storage/test_base.py | 3 +- 7 files changed, 122 insertions(+), 84 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 904c7c0945..c255df1bbb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -222,7 +222,7 @@ class FederationClient(FederationBase): for p in transaction_data["pdus"] ] - if pdu_list: + if pdu_list and pdu_list[0]: pdu = pdu_list[0] # Check signatures are correct. @@ -255,7 +255,7 @@ class FederationClient(FederationBase): ) continue - if self._get_pdu_cache is not None: + if self._get_pdu_cache is not None and pdu: self._get_pdu_cache[event_id] = pdu defer.returnValue(pdu) @@ -475,6 +475,9 @@ class FederationClient(FederationBase): limit (int): Maximum number of events to return. min_depth (int): Minimum depth of events tor return. """ + logger.debug("get_missing_events: latest_events: %r", latest_events) + logger.debug("get_missing_events: earliest_events_ids: %r", earliest_events_ids) + try: content = yield self.transport_layer.get_missing_events( destination=destination, @@ -485,6 +488,8 @@ class FederationClient(FederationBase): min_depth=min_depth, ) + logger.debug("get_missing_events: Got content: %r", content) + events = [ self.event_from_pdu_json(e) for e in content.get("events", []) @@ -494,6 +499,8 @@ class FederationClient(FederationBase): destination, events, outlier=False ) + logger.debug("get_missing_events: signed_events: %r", signed_events) + have_gotten_all_from_destination = True except HttpResponseException as e: if not e.code == 400: @@ -518,6 +525,8 @@ class FederationClient(FederationBase): # Are we missing any? seen_events = set(earliest_events_ids) + + logger.debug("get_missing_events: signed_events2: %r", signed_events) seen_events.update(e.event_id for e in signed_events) missing_events = {} @@ -561,7 +570,7 @@ class FederationClient(FederationBase): res = yield defer.DeferredList(deferreds, consumeErrors=True) for (result, val), (e_id, _) in zip(res, ordered_missing): - if result: + if result and val: signed_events.append(val) else: failed_to_fetch.add(e_id) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index cd79e23f4b..2c6488dd1b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -415,6 +415,8 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: + logger.debug("We're missing: %r", prevs-seen) + latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 867fdbefb0..6a1b25d112 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -303,18 +303,27 @@ class MessageHandler(BaseHandler): if event.membership != Membership.JOIN: return try: - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=now_token.room_key, - ), - self.state_handler.get_current_state( - event.room_id - ), - ] - ).addErrback(unwrapFirstError) + # (messages, token), current_state = yield defer.gatherResults( + # [ + # self.store.get_recent_events_for_room( + # event.room_id, + # limit=limit, + # end_token=now_token.room_key, + # ), + # self.state_handler.get_current_state( + # event.room_id + # ), + # ] + # ).addErrback(unwrapFirstError) + + messages, token = yield self.store.get_recent_events_for_room( + event.room_id, + limit=limit, + end_token=now_token.room_key, + ) + current_state = yield self.state_handler.get_current_state( + event.room_id + ) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ceff99c16d..0df1b46edc 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,10 +301,12 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._event_fetch_lock = threading.Condition() + self._event_fetch_lock = threading.Lock() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + self._pending_ds = [] + self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator() @@ -344,8 +346,7 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - @contextlib.contextmanager - def _new_transaction(self, conn, desc, after_callbacks): + def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -366,6 +367,9 @@ class SQLBaseStore(object): txn = LoggingTransaction( txn, name, self.database_engine, after_callbacks ) + r = func(txn, *args, **kwargs) + conn.commit() + return r except self.database_engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -398,17 +402,6 @@ class SQLBaseStore(object): ) continue raise - - try: - yield txn - conn.commit() - return - except: - try: - conn.rollback() - except: - pass - raise except Exception as e: logger.debug("[TXN FAIL] {%s} %s", name, e) raise @@ -440,8 +433,9 @@ class SQLBaseStore(object): conn.reconnect() current_context.copy_to(context) - with self._new_transaction(conn, desc, after_callbacks) as txn: - return func(txn, *args, **kwargs) + return self._new_transaction( + conn, desc, after_callbacks, func, *args, **kwargs + ) result = yield preserve_context_over_fn( self._db_pool.runWithConnection, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b4abd83260..260bdf0ec4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -420,12 +420,14 @@ class EventsStore(SQLBaseStore): ]) if not txn: + logger.debug("enqueue before") missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) + logger.debug("enqueue after") else: missing_events = self._fetch_events_txn( txn, @@ -498,41 +500,39 @@ class EventsStore(SQLBaseStore): allow_rejected=allow_rejected, )) - @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - defer.returnValue({}) - - def do_fetch(conn): - event_list = [] + def _do_fetch(self, conn): + event_list = [] + try: while True: - try: - with self._event_fetch_lock: - i = 0 - while not self._event_fetch_list: - self._event_fetch_ongoing -= 1 - return - - event_list = self._event_fetch_list - self._event_fetch_list = [] - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - - with self._new_transaction(conn, "do_fetch", []) as txn: - rows = self._fetch_event_rows(txn, event_ids) - - row_dict = { - r["event_id"]: r - for r in rows - } + logger.debug("do_fetch getting lock") + with self._event_fetch_lock: + logger.debug("do_fetch go lock: %r", self._event_fetch_list) + event_list = self._event_fetch_list + self._event_fetch_list = [] + if not event_list: + self._event_fetch_ongoing -= 1 + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + + rows = self._new_transaction( + conn, "do_fetch", [], self._fetch_event_rows, event_ids + ) - for ids, d in event_list: - def fire(): - if not d.called: + row_dict = { + r["event_id"]: r + for r in rows + } + + logger.debug("do_fetch got events: %r", row_dict.keys()) + + def fire(evs): + for ids, d in evs: + if not d.called: + try: d.callback( [ row_dict[i] @@ -540,32 +540,51 @@ class EventsStore(SQLBaseStore): if i in row_dict ] ) - reactor.callFromThread(fire) - except Exception as e: - logger.exception("do_fetch") - for _, d in event_list: - if not d.called: - reactor.callFromThread(d.errback, e) + except: + logger.exception("Failed to callback") + reactor.callFromThread(fire, event_list) + except Exception as e: + logger.exception("do_fetch") - with self._event_fetch_lock: - self._event_fetch_ongoing -= 1 - return + def fire(evs): + for _, d in evs: + if not d.called: + d.errback(e) + + if event_list: + reactor.callFromThread(fire, event_list) + + @defer.inlineCallbacks + def _enqueue_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) events_d = defer.Deferred() - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, events_d) - ) + try: + logger.debug("enqueueueueue getting lock") + with self._event_fetch_lock: + logger.debug("enqueue go lock") + self._event_fetch_list.append( + (events, events_d) + ) - self._event_fetch_lock.notify_all() + self._event_fetch_ongoing += 1 - # if self._event_fetch_ongoing < 5: - self._event_fetch_ongoing += 1 self.runWithConnection( - do_fetch + self._do_fetch ) - rows = yield events_d + except Exception as e: + if not events_d.called: + events_d.errback(e) + + logger.debug("events_d before") + try: + rows = yield events_d + except: + logger.exception("events_d") + logger.debug("events_d after") res = yield defer.gatherResults( [ @@ -580,6 +599,7 @@ class EventsStore(SQLBaseStore): ], consumeErrors=True ) + logger.debug("gatherResults after") defer.returnValue({ e.event_id: e @@ -639,7 +659,8 @@ class EventsStore(SQLBaseStore): rejected_reason=row["rejects"], ) for row in rows - ] + ], + consumeErrors=True, ) defer.returnValue({ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d16b57c515..af45fc5619 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -357,10 +357,12 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + logger.debug("stream before") events = yield self._get_events( [r["event_id"] for r in rows], get_prev_content=True ) + logger.debug("stream after") self._set_before_and_after(events, rows) diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 8c348ecc95..8573f18b55 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -33,8 +33,9 @@ class SQLBaseStoreTestCase(unittest.TestCase): def setUp(self): self.db_pool = Mock(spec=["runInteraction"]) self.mock_txn = Mock() - self.mock_conn = Mock(spec_set=["cursor"]) + self.mock_conn = Mock(spec_set=["cursor", "rollback", "commit"]) self.mock_conn.cursor.return_value = self.mock_txn + self.mock_conn.rollback.return_value = None # Our fake runInteraction just runs synchronously inline def runInteraction(func, *args, **kwargs): -- cgit 1.5.1 From f8bd4de87de9464c54dcc50371866e3537754b9b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 09:58:03 +0100 Subject: Remove debug logging --- synapse/federation/federation_client.py | 8 -------- 1 file changed, 8 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c255df1bbb..fe5a7a9fa0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -475,9 +475,6 @@ class FederationClient(FederationBase): limit (int): Maximum number of events to return. min_depth (int): Minimum depth of events tor return. """ - logger.debug("get_missing_events: latest_events: %r", latest_events) - logger.debug("get_missing_events: earliest_events_ids: %r", earliest_events_ids) - try: content = yield self.transport_layer.get_missing_events( destination=destination, @@ -488,8 +485,6 @@ class FederationClient(FederationBase): min_depth=min_depth, ) - logger.debug("get_missing_events: Got content: %r", content) - events = [ self.event_from_pdu_json(e) for e in content.get("events", []) @@ -499,8 +494,6 @@ class FederationClient(FederationBase): destination, events, outlier=False ) - logger.debug("get_missing_events: signed_events: %r", signed_events) - have_gotten_all_from_destination = True except HttpResponseException as e: if not e.code == 400: @@ -526,7 +519,6 @@ class FederationClient(FederationBase): seen_events = set(earliest_events_ids) - logger.debug("get_missing_events: signed_events2: %r", signed_events) seen_events.update(e.event_id for e in signed_events) missing_events = {} -- cgit 1.5.1 From c71176858b9d58cfbe5520ad1dac8191c005fdc9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 10:11:14 +0100 Subject: Newline, remove debug logging --- synapse/federation/federation_server.py | 2 -- synapse/storage/_base.py | 1 - synapse/storage/schema/delta/19/event_index.sql | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2c6488dd1b..cd79e23f4b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -415,8 +415,6 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: - logger.debug("We're missing: %r", prevs-seen) - latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b529e0543e..d1f050394d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,7 +15,6 @@ import logging from synapse.api.errors import StoreError - from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache diff --git a/synapse/storage/schema/delta/19/event_index.sql b/synapse/storage/schema/delta/19/event_index.sql index f3792817bb..3881fc9897 100644 --- a/synapse/storage/schema/delta/19/event_index.sql +++ b/synapse/storage/schema/delta/19/event_index.sql @@ -16,4 +16,4 @@ CREATE INDEX events_order_topo_stream_room ON events( topological_ordering, stream_ordering, room_id -); \ No newline at end of file +); -- cgit 1.5.1 From d5cea26d45a50053fcb16296b73bbced49675a74 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 10:16:45 +0100 Subject: Remove pointless newline --- synapse/federation/federation_client.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index fe5a7a9fa0..3a7bc0c9a7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -518,7 +518,6 @@ class FederationClient(FederationBase): # Are we missing any? seen_events = set(earliest_events_ids) - seen_events.update(e.event_id for e in signed_events) missing_events = {} -- cgit 1.5.1 From 5b1631a4a9ad4c1ed0adaff3ffc8238014359e95 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 14:53:32 +0100 Subject: Add a timeout param to get_event --- synapse/federation/federation_base.py | 1 + synapse/federation/federation_client.py | 23 ++++++++++++++--------- synapse/federation/transport/client.py | 4 ++-- synapse/http/matrixfederationclient.py | 13 ++++++++----- 4 files changed, 25 insertions(+), 16 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 5217d91aab..f0430b2cb1 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -80,6 +80,7 @@ class FederationBase(object): destinations=[pdu.origin], event_id=pdu.event_id, outlier=outlier, + timeout=10000, ) if new_pdu: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 904c7c0945..a163b2674d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -22,6 +22,7 @@ from .units import Edu from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) +from synapse.util import unwrapFirstError from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -173,7 +174,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - def get_pdu(self, destinations, event_id, outlier=False): + def get_pdu(self, destinations, event_id, outlier=False, timeout=None): """Requests the PDU with given origin and ID from the remote home servers. @@ -212,7 +213,7 @@ class FederationClient(FederationBase): with limiter: transaction_data = yield self.transport_layer.get_event( - destination, event_id + destination, event_id, timeout=timeout, ) logger.debug("transaction_data %r", transaction_data) @@ -370,13 +371,17 @@ class FederationClient(FederationBase): for p in content.get("auth_chain", []) ] - signed_state = yield self._check_sigs_and_hash_and_fetch( - destination, state, outlier=True - ) - - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True - ) + signed_state, signed_auth = yield defer.gatherResults( + [ + self._check_sigs_and_hash_and_fetch( + destination, state, outlier=True + ), + self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) + ], + consumeErrors=True + ).addErrback(unwrapFirstError) auth_chain.sort(key=lambda e: e.depth) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 80d03012b7..c2b53b78b2 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -50,7 +50,7 @@ class TransportLayerClient(object): ) @log_function - def get_event(self, destination, event_id): + def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. Args: @@ -65,7 +65,7 @@ class TransportLayerClient(object): destination, event_id) path = PREFIX + "/event/%s/" % (event_id, ) - return self.client.get_json(destination, path=path) + return self.client.get_json(destination, path=path, timeout=timeout) @log_function def backfill(self, destination, room_id, event_tuples, limit): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c99d237c73..312bbcc6b8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -110,7 +110,8 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", - query_bytes=b"", retry_on_dns_fail=True): + query_bytes=b"", retry_on_dns_fail=True, + timeout=None): """ Creates and sends a request to the given url """ headers_dict[b"User-Agent"] = [self.version_string] @@ -158,7 +159,7 @@ class MatrixFederationHttpClient(object): response = yield self.clock.time_bound_deferred( request_deferred, - time_out=60, + time_out=timeout/1000. if timeout else 60, ) logger.debug("Got response to %s", method) @@ -181,7 +182,7 @@ class MatrixFederationHttpClient(object): _flatten_response_never_received(e), ) - if retries_left: + if retries_left and not timeout: yield sleep(2 ** (5 - retries_left)) retries_left -= 1 else: @@ -334,7 +335,8 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def get_json(self, destination, path, args={}, retry_on_dns_fail=True): + def get_json(self, destination, path, args={}, retry_on_dns_fail=True, + timeout=None): """ GETs some json from the given host homeserver and path Args: @@ -370,7 +372,8 @@ class MatrixFederationHttpClient(object): path.encode("ascii"), query_bytes=query_bytes, body_callback=body_callback, - retry_on_dns_fail=retry_on_dns_fail + retry_on_dns_fail=retry_on_dns_fail, + timeout=timeout, ) if 200 <= response.code < 300: -- cgit 1.5.1 From 3a653515ec3bba9d5b143e37bc9569d5caa50a5b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 15:27:09 +0100 Subject: Add None check --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index a163b2674d..4b3bf97835 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -523,7 +523,7 @@ class FederationClient(FederationBase): # Are we missing any? seen_events = set(earliest_events_ids) - seen_events.update(e.event_id for e in signed_events) + seen_events.update(e.event_id for e in signed_events if e) missing_events = {} for e in itertools.chain(latest_events, signed_events): -- cgit 1.5.1 From 20814fabdd001ee6a04efc5277d71e80fdbf5a14 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 11:59:02 +0100 Subject: Actually fetch state for new backwards extremeties when backfilling. --- synapse/federation/federation_client.py | 6 +- synapse/handlers/federation.py | 164 ++++++++++++++++++++------------ 2 files changed, 108 insertions(+), 62 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 4b3bf97835..6febc8618c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -168,7 +168,11 @@ class FederationClient(FederationBase): for i, pdu in enumerate(pdus): pdus[i] = yield self._check_sigs_and_hash(pdu) - # FIXME: We should handle signature failures more gracefully. + # FIXME: We should handle signature failures more gracefully. + pdus[:] = yield defer.gatherResults( + [self._check_sigs_and_hash(pdu) for pdu in pdus], + consumeErrors=True, + ).addErrback(unwrapFirstError) defer.returnValue(pdus) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d85b1cf5de..46ce3699d7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -230,27 +230,65 @@ class FederationHandler(BaseHandler): if not extremities: extremities = yield self.store.get_oldest_events_in_room(room_id) - pdus = yield self.replication_layer.backfill( + events = yield self.replication_layer.backfill( dest, room_id, - limit, + limit=limit, extremities=extremities, ) - events = [] + event_map = {e.event_id: e for e in events} - for pdu in pdus: - event = pdu + event_ids = set(e.event_id for e in events) - # FIXME (erikj): Not sure this actually works :/ - context = yield self.state_handler.compute_event_context(event) + edges = [ + ev.event_id + for ev in events + if set(e_id for e_id, _ in ev.prev_events) - event_ids + ] - events.append((event, context)) + # For each edge get the current state. - yield self.store.persist_event( - event, - context=context, - backfilled=True + auth_events = {} + events_to_state = {} + for e_id in edges: + state, auth = yield self.replication_layer.get_state_for_room( + destination=dest, + room_id=room_id, + event_id=e_id + ) + auth_events.update({a.event_id: a for a in auth}) + events_to_state[e_id] = state + + yield defer.gatherResults( + [ + self._handle_new_event(dest, a) + for a in auth_events.values() + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + yield defer.gatherResults( + [ + self._handle_new_event( + dest, event_map[e_id], + state=events_to_state[e_id], + backfilled=True, + ) + for e_id in events_to_state + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + + events.sort(key=lambda e: e.depth) + + for event in events: + if event in events_to_state: + continue + + yield self._handle_new_event( + dest, event, + backfilled=True, ) defer.returnValue(events) @@ -347,7 +385,7 @@ class FederationHandler(BaseHandler): logger.info(e.message) continue except Exception as e: - logger.warn( + logger.exception( "Failed to backfill from %s because %s", dom, e, ) @@ -517,54 +555,9 @@ class FederationHandler(BaseHandler): # FIXME pass - auth_ids_to_deferred = {} - - def process_auth_ev(ev): - auth_ids = [e_id for e_id, _ in ev.auth_events] - - prev_ds = [ - auth_ids_to_deferred[i] - for i in auth_ids - if i in auth_ids_to_deferred - ] - - d = defer.Deferred() - - auth_ids_to_deferred[ev.event_id] = d - - @defer.inlineCallbacks - def f(*_): - ev.internal_metadata.outlier = True - - try: - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - - yield self._handle_new_event( - origin, ev, auth_events=auth - ) - except: - logger.exception( - "Failed to handle auth event %s", - ev.event_id, - ) - - d.callback(None) - - if prev_ds: - dx = defer.DeferredList(prev_ds) - dx.addBoth(f) - else: - f() - - for e in auth_chain: - if e.event_id == event.event_id: - return - process_auth_ev(e) - - yield defer.DeferredList(auth_ids_to_deferred.values()) + yield self._handle_auth_events( + origin, [e for e in auth_chain if e.event_id != event.event_id] + ) @defer.inlineCallbacks def handle_state(e): @@ -1348,3 +1341,52 @@ class FederationHandler(BaseHandler): }, "missing": [e.event_id for e in missing_locals], }) + + @defer.inlineCallbacks + def _handle_auth_events(self, origin, auth_events): + auth_ids_to_deferred = {} + + def process_auth_ev(ev): + auth_ids = [e_id for e_id, _ in ev.auth_events] + + prev_ds = [ + auth_ids_to_deferred[i] + for i in auth_ids + if i in auth_ids_to_deferred + ] + + d = defer.Deferred() + + auth_ids_to_deferred[ev.event_id] = d + + @defer.inlineCallbacks + def f(*_): + ev.internal_metadata.outlier = True + + try: + auth = { + (e.type, e.state_key): e for e in auth_events + if e.event_id in auth_ids + } + + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + except: + logger.exception( + "Failed to handle auth event %s", + ev.event_id, + ) + + d.callback(None) + + if prev_ds: + dx = defer.DeferredList(prev_ds) + dx.addBoth(f) + else: + f() + + for e in auth_events: + process_auth_ev(e) + + yield defer.DeferredList(auth_ids_to_deferred.values()) -- cgit 1.5.1 From 1b446a5d85f35d0af016496d9b12733ce667adb1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 14:26:08 +0100 Subject: Log less lines at INFO level, but include more helpful information --- synapse/federation/transaction_queue.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ca04822fb3..afe71897f9 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -207,13 +207,13 @@ class TransactionQueue(object): # request at which point pending_pdus_by_dest just keeps growing. # we need application-layer timeouts of some flavour of these # requests - logger.info( + logger.debug( "TX [%s] Transaction already in progress", destination ) return - logger.info("TX [%s] _attempt_new_transaction", destination) + logger.debug("TX [%s] _attempt_new_transaction", destination) # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) @@ -221,11 +221,11 @@ class TransactionQueue(object): pending_failures = self.pending_failures_by_dest.pop(destination, []) if pending_pdus: - logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - destination, len(pending_pdus)) + logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) if not pending_pdus and not pending_edus and not pending_failures: - logger.info("TX [%s] Nothing to send", destination) + logger.debug("TX [%s] Nothing to send", destination) return # Sort based on the order field @@ -275,9 +275,13 @@ class TransactionQueue(object): logger.debug("TX [%s] Persisted transaction", destination) logger.info( - "TX [%s] Sending transaction [%s]", + "TX [%s] Sending transaction [%s]," + " (PDUs: %d, EDUs: %d, failures: %d)", destination, transaction.transaction_id, + len(pending_pdus), + len(pending_edus), + len(pending_failures), ) with limiter: -- cgit 1.5.1 From e70e8e053e2d44d0f7cebe3bee7b4afbe103f0a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 14:33:11 +0100 Subject: Add txn_id to some log lines --- synapse/federation/transaction_queue.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index afe71897f9..32fa5e8c15 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -242,6 +242,8 @@ class TransactionQueue(object): try: self.pending_transactions[destination] = 1 + txn_id = str(self._next_txn_id) + limiter = yield get_retry_limiter( destination, self._clock, @@ -249,9 +251,9 @@ class TransactionQueue(object): ) logger.debug( - "TX [%s] Attempting new transaction" + "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", - destination, + destination, txn_id, len(pending_pdus), len(pending_edus), len(pending_failures) @@ -261,7 +263,7 @@ class TransactionQueue(object): transaction = Transaction.create_new( origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), + transaction_id=txn_id, origin=self.server_name, destination=destination, pdus=pdus, @@ -275,9 +277,9 @@ class TransactionQueue(object): logger.debug("TX [%s] Persisted transaction", destination) logger.info( - "TX [%s] Sending transaction [%s]," + "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d, failures: %d)", - destination, + destination, txn_id, transaction.transaction_id, len(pending_pdus), len(pending_edus), @@ -317,7 +319,10 @@ class TransactionQueue(object): code = e.code response = e.response - logger.info("TX [%s] got %d response", destination, code) + logger.info( + "TX [%s] {%s} got %d response", + destination, txn_id, code + ) logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) -- cgit 1.5.1 From b21d015c55dde5255c7a63f8d17e77caf535030b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 14:44:25 +0100 Subject: Log origin and stats of incoming transactions --- synapse/federation/transport/server.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2bfe0f3c9b..af87805f34 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -196,6 +196,14 @@ class FederationSendServlet(BaseFederationServlet): transaction_id, str(transaction_data) ) + logger.info( + "Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)", + transaction_id, origin, + len(transaction_data.get("pdus", [])), + len(transaction_data.get("edus", [])), + len(transaction_data.get("failures", [])), + ) + # We should ideally be getting this from the security layer. # origin = body["origin"] -- cgit 1.5.1 From 284f55a7fbf597e32508301fe9571cd1b8523625 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 15:18:04 +0100 Subject: Add doc strings --- synapse/federation/federation_client.py | 2 ++ synapse/federation/transport/client.py | 2 ++ synapse/http/matrixfederationclient.py | 3 +++ 3 files changed, 7 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ecb6dbd770..3249060bcf 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -190,6 +190,8 @@ class FederationClient(FederationBase): outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` + timeout (int): How long to try (in ms) each destination for before + moving to the next destination. None indicates no timeout. Returns: Deferred: Results in the requested PDU. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index c2b53b78b2..610a4c3163 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -57,6 +57,8 @@ class TransportLayerClient(object): destination (str): The host name of the remote home server we want to get the state from. event_id (str): The id of the event being requested. + timeout (int): How long to try (in ms) the destination for before + giving up. None indicates no timeout. Returns: Deferred: Results in a dict received from the remote homeserver. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 312bbcc6b8..6f976d5ce8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -345,6 +345,9 @@ class MatrixFederationHttpClient(object): path (str): The HTTP path. args (dict): A dictionary used to create query strings, defaults to None. + timeout (int): How long to try (in ms) the destination for before + giving up. None indicates no timeout and that the request will + be retried. Returns: Deferred: Succeeds when we get *any* HTTP response. -- cgit 1.5.1 From 6eadbfbea0f8eb742f94d73e262631b0877e3dee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 16:12:20 +0100 Subject: Remove redundant for loop --- synapse/federation/federation_client.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index cbb9d354b6..d3b46b24c1 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -165,9 +165,6 @@ class FederationClient(FederationBase): for p in transaction_data["pdus"] ] - for i, pdu in enumerate(pdus): - pdus[i] = yield self._check_sigs_and_hash(pdu) - # FIXME: We should handle signature failures more gracefully. pdus[:] = yield defer.gatherResults( [self._check_sigs_and_hash(pdu) for pdu in pdus], -- cgit 1.5.1