diff options
47 files changed, 333 insertions, 112 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml index 521aca22ef..6ae3a42235 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,5 +1,27 @@ version: 2 jobs: + dockerhubuploadrelease: + machine: true + steps: + - checkout + - run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_TAG} . + - run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_TAG}-py3 --build-arg PYTHON_VERSION=3.6 . + - run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD + - run: docker push matrixdotorg/synapse:${CIRCLE_TAG} + - run: docker push matrixdotorg/synapse:${CIRCLE_TAG}-py3 + dockerhubuploadlatest: + machine: true + steps: + - checkout + - run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_SHA1} . + - run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_SHA1}-py3 --build-arg PYTHON_VERSION=3.6 . + - run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD + - run: docker tag matrixdotorg/synapse:${CIRCLE_SHA1} matrixdotorg/synapse:latest + - run: docker tag matrixdotorg/synapse:${CIRCLE_SHA1}-py3 matrixdotorg/synapse:latest-py3 + - run: docker push matrixdotorg/synapse:${CIRCLE_SHA1} + - run: docker push matrixdotorg/synapse:${CIRCLE_SHA1}-py3 + - run: docker push matrixdotorg/synapse:latest + - run: docker push matrixdotorg/synapse:latest-py3 sytestpy2: machine: true steps: @@ -131,3 +153,13 @@ workflows: filters: branches: ignore: /develop|master|release-.*/ + - dockerhubuploadrelease: + filters: + tags: + only: /^v[0-9].[0-9]+.[0-9]+(.[0-9]+)?/ + branches: + ignore: /.*/ + - dockerhubuploadlatest: + filters: + branches: + only: master diff --git a/.travis.yml b/.travis.yml index b6faca4b92..2077f6af72 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,9 @@ matrix: env: TOX_ENV=py27 - python: 2.7 + env: TOX_ENV=py27-old + + - python: 2.7 env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4" services: - postgresql diff --git a/changelog.d/3794.misc b/changelog.d/3794.misc new file mode 100644 index 0000000000..6b98c9609b --- /dev/null +++ b/changelog.d/3794.misc @@ -0,0 +1 @@ +Speed up calculation of typing updates for replication diff --git a/changelog.d/3946.misc b/changelog.d/3946.misc new file mode 100644 index 0000000000..803857a297 --- /dev/null +++ b/changelog.d/3946.misc @@ -0,0 +1 @@ +Automate pushes to docker hub diff --git a/changelog.d/3952.misc b/changelog.d/3952.misc new file mode 100644 index 0000000000..015e4a43e6 --- /dev/null +++ b/changelog.d/3952.misc @@ -0,0 +1 @@ +Run the test suite on the oldest supported versions of our dependencies in CI. \ No newline at end of file diff --git a/changelog.d/3958.misc b/changelog.d/3958.misc new file mode 100644 index 0000000000..5931d06dcf --- /dev/null +++ b/changelog.d/3958.misc @@ -0,0 +1 @@ +Fix docstrings and add tests for state store methods diff --git a/changelog.d/3959.feature b/changelog.d/3959.feature new file mode 100644 index 0000000000..b3a4f37a8d --- /dev/null +++ b/changelog.d/3959.feature @@ -0,0 +1 @@ +Include eventid in log lines when processing incoming federation transactions \ No newline at end of file diff --git a/changelog.d/3961.bugfix b/changelog.d/3961.bugfix new file mode 100644 index 0000000000..e46b5834aa --- /dev/null +++ b/changelog.d/3961.bugfix @@ -0,0 +1 @@ +Fix errors due to concurrent monthly_active_user upserts diff --git a/changelog.d/3963.misc b/changelog.d/3963.misc new file mode 100644 index 0000000000..f1e0eaf18e --- /dev/null +++ b/changelog.d/3963.misc @@ -0,0 +1 @@ +fix docstring for FederationClient.get_state_for_room diff --git a/changelog.d/3965.misc b/changelog.d/3965.misc new file mode 100644 index 0000000000..e7e4a9c5a8 --- /dev/null +++ b/changelog.d/3965.misc @@ -0,0 +1 @@ +Run notify_app_services as a bg process diff --git a/changelog.d/3966.misc b/changelog.d/3966.misc new file mode 100644 index 0000000000..1e3c8e1706 --- /dev/null +++ b/changelog.d/3966.misc @@ -0,0 +1 @@ +Improve the logging when handling a federation transaction \ No newline at end of file diff --git a/changelog.d/3967.misc b/changelog.d/3967.misc new file mode 100644 index 0000000000..dc808aec73 --- /dev/null +++ b/changelog.d/3967.misc @@ -0,0 +1 @@ +Clarifications in FederationHandler diff --git a/changelog.d/3970.bugfix b/changelog.d/3970.bugfix new file mode 100644 index 0000000000..5625315497 --- /dev/null +++ b/changelog.d/3970.bugfix @@ -0,0 +1 @@ +Replaced all occurences of e.message with str(e). Contributed by Schnuffle diff --git a/changelog.d/3976.misc b/changelog.d/3976.misc new file mode 100644 index 0000000000..282148c986 --- /dev/null +++ b/changelog.d/3976.misc @@ -0,0 +1 @@ +Build py3 docker images for docker hub too diff --git a/changelog.d/3980.bugfix b/changelog.d/3980.bugfix new file mode 100644 index 0000000000..7578414ede --- /dev/null +++ b/changelog.d/3980.bugfix @@ -0,0 +1 @@ +Fix some instances of ExpiringCache not expiring cache items diff --git a/scripts-dev/dump_macaroon.py b/scripts-dev/dump_macaroon.py index 6e45be75d6..fcc5568835 100755 --- a/scripts-dev/dump_macaroon.py +++ b/scripts-dev/dump_macaroon.py @@ -21,4 +21,4 @@ try: verifier.verify(macaroon, key) print "Signature is correct" except Exception as e: - print e.message + print str(e) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index a31a9a17e0..eed8c67e6a 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -226,7 +226,7 @@ class Filtering(object): jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA, format_checker=FormatChecker()) except jsonschema.ValidationError as e: - raise SynapseError(400, e.message) + raise SynapseError(400, str(e)) class FilterCollection(object): diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index 3b6b9368b8..c3afcc573b 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -24,7 +24,7 @@ try: python_dependencies.check_requirements() except python_dependencies.MissingRequirementError as e: message = "\n".join([ - "Missing Requirement: %s" % (e.message,), + "Missing Requirement: %s" % (str(e),), "To install run:", " pip install --upgrade --force \"%s\"" % (e.dependency,), "", diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 02039f7e79..8559e141af 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -136,7 +136,7 @@ def start(config_options): "Synapse appservice", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.appservice" diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 4c73c637bb..76aed8c60a 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -153,7 +153,7 @@ def start(config_options): "Synapse client reader", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.client_reader" diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index bc82197b2a..9060ab14f6 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -169,7 +169,7 @@ def start(config_options): "Synapse event creator", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.event_creator" diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 18ca71ef99..228a297fb8 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -140,7 +140,7 @@ def start(config_options): "Synapse federation reader", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.federation_reader" diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 6501c57792..e9a99d76e1 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -160,7 +160,7 @@ def start(config_options): "Synapse federation sender", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.federation_sender" diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index b076fbe522..fc4b25de1c 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -228,7 +228,7 @@ def start(config_options): "Synapse frontend proxy", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.frontend_proxy" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 8c5d858b0b..e3f0d99a3f 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -301,7 +301,7 @@ class SynapseHomeServer(HomeServer): try: database_engine.check_database(db_conn.cursor()) except IncorrectDatabaseSetup as e: - quit_with_error(e.message) + quit_with_error(str(e)) # Gauges to expose monthly active user control metrics @@ -328,7 +328,7 @@ def setup(config_options): config_options, ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) if not config: @@ -386,7 +386,6 @@ def setup(config_options): hs.get_pusherpool().start() hs.get_datastore().start_profiling() hs.get_datastore().start_doing_background_updates() - hs.get_federation_client().start_get_pdu_cache() reactor.callWhenRunning(start) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 992d182dba..acc0487adc 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -133,7 +133,7 @@ def start(config_options): "Synapse media repository", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.media_repository" diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 2ec4c7defb..630dcda478 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -191,7 +191,7 @@ def start(config_options): "Synapse pusher", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.pusher" diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index df81b7bcbe..9a7fc6ee9d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -410,7 +410,7 @@ def start(config_options): "Synapse synchrotron", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.synchrotron" diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index b383e79c1c..0a5f62b509 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -188,7 +188,7 @@ def start(config_options): "Synapse user directory", config_options ) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) assert config.worker_app == "synapse.app.user_dir" diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py index 58c97a70af..8fccf573ee 100644 --- a/synapse/config/__main__.py +++ b/synapse/config/__main__.py @@ -25,7 +25,7 @@ if __name__ == "__main__": try: config = HomeServerConfig.load_config("", sys.argv[3:]) except ConfigError as e: - sys.stderr.write("\n" + e.message + "\n") + sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) print (getattr(config, key)) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5a92428f56..d05ed91d64 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -209,8 +209,6 @@ class FederationClient(FederationBase): Will attempt to get the PDU from each destination in the list until one succeeds. - This will persist the PDU locally upon receipt. - Args: destinations (list): Which home servers to query event_id (str): event to fetch @@ -289,8 +287,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function def get_state_for_room(self, destination, room_id, event_id): - """Requests all of the `current` state PDUs for a given room from - a remote home server. + """Requests all of the room state at a given event from a remote home server. Args: destination (str): The remote homeserver to query for the state. @@ -298,9 +295,10 @@ class FederationClient(FederationBase): event_id (str): The id of the event we want the state at. Returns: - Deferred: Results in a list of PDUs. + Deferred[Tuple[List[EventBase], List[EventBase]]]: + A list of events in the state, and a list of events in the auth chain + for the given event. """ - try: # First we try and ask for just the IDs, as thats far quicker if # we have most of the state and auth_chain already. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9a571e4fc7..819e8f7331 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -46,6 +46,7 @@ from synapse.replication.http.federation import ( from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.util.logcontext import nested_logging_context from synapse.util.logutils import log_function # when processing incoming transactions, we try to handle multiple rooms in @@ -187,21 +188,22 @@ class FederationServer(FederationBase): for pdu in pdus_by_room[room_id]: event_id = pdu.event_id - try: - yield self._handle_received_pdu( - origin, pdu - ) - pdu_results[event_id] = {} - except FederationError as e: - logger.warn("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s: %s", - event_id, f.getTraceback().rstrip(), - ) + with nested_logging_context(event_id): + try: + yield self._handle_received_pdu( + origin, pdu + ) + pdu_results[event_id] = {} + except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + f = failure.Failure() + pdu_results[event_id] = {"error": str(e)} + logger.error( + "Failed to handle PDU %s: %s", + event_id, f.getTraceback().rstrip(), + ) yield concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 578e9250fb..9dc46aa15f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -341,7 +341,7 @@ class E2eKeysHandler(object): def _exception_to_failure(e): if isinstance(e, CodeMessageException): return { - "status": e.code, "message": e.message, + "status": e.code, "message": str(e), } if isinstance(e, NotRetryingDestination): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2ccdc3bfa7..38bebbf598 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -339,14 +339,26 @@ class FederationHandler(BaseHandler): "[%s %s] Requesting state at missing prev_event %s", room_id, event_id, p, ) - state, got_auth_chain = ( - yield self.federation_client.get_state_for_room( - origin, room_id, p, + + with logcontext.nested_logging_context(p): + # note that if any of the missing prevs share missing state or + # auth events, the requests to fetch those events are deduped + # by the get_pdu_cache in federation_client. + remote_state, got_auth_chain = ( + yield self.federation_client.get_state_for_room( + origin, room_id, p, + ) ) - ) - auth_chains.update(got_auth_chain) - state_group = {(x.type, x.state_key): x.event_id for x in state} - state_groups.append(state_group) + + # XXX hrm I'm not convinced that duplicate events will compare + # for equality, so I'm not sure this does what the author + # hoped. + auth_chains.update(got_auth_chain) + + state_group = { + (x.type, x.state_key): x.event_id for x in remote_state + } + state_groups.append(state_group) # Resolve any conflicting state def fetch(ev_ids): @@ -483,20 +495,21 @@ class FederationHandler(BaseHandler): "[%s %s] Handling received prev_event %s", room_id, event_id, ev.event_id, ) - try: - yield self.on_receive_pdu( - origin, - ev, - sent_to_us_directly=False, - ) - except FederationError as e: - if e.code == 403: - logger.warn( - "[%s %s] Received prev_event %s failed history check.", - room_id, event_id, ev.event_id, + with logcontext.nested_logging_context(ev.event_id): + try: + yield self.on_receive_pdu( + origin, + ev, + sent_to_us_directly=False, ) - else: - raise + except FederationError as e: + if e.code == 403: + logger.warn( + "[%s %s] Received prev_event %s failed history check.", + room_id, event_id, ev.event_id, + ) + else: + raise @defer.inlineCallbacks def _process_received_pdu(self, origin, event, state, auth_chain): @@ -572,6 +585,10 @@ class FederationHandler(BaseHandler): }) seen_ids.add(e.event_id) + logger.info( + "[%s %s] persisting newly-received auth/state events %s", + room_id, event_id, [e["event"].event_id for e in event_infos] + ) yield self._handle_new_events(origin, event_infos) try: @@ -1135,7 +1152,8 @@ class FederationHandler(BaseHandler): try: logger.info("Processing queued PDU %s which was received " "while we were joining %s", p.event_id, p.room_id) - yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) + with logcontext.nested_logging_context(p.event_id): + yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) except Exception as e: logger.warn( "Error handling queued PDU %s from %s: %s", @@ -1581,15 +1599,22 @@ class FederationHandler(BaseHandler): Notifies about the events where appropriate. """ - contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.run_in_background( - self._prep_event, + + @defer.inlineCallbacks + def prep(ev_info): + event = ev_info["event"] + with logcontext.nested_logging_context(suffix=event.event_id): + res = yield self._prep_event( origin, - ev_info["event"], + event, state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), ) + defer.returnValue(res) + + contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.run_in_background(prep, ev_info) for ev_info in event_infos ], consumeErrors=True, )) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 75b8b7ce6a..f284d5a385 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler): except Exception as e: logger.warn( "Failed to update join event for room %s - %s", - room_id, str(e.message) + room_id, str(e) ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index bf82b3f864..c610933dd4 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -20,6 +20,7 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError from synapse.types import UserID, get_domain_from_id +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -68,6 +69,11 @@ class TypingHandler(object): # map room IDs to sets of users currently typing self._room_typing = {} + # caches which room_ids changed at which serials + self._typing_stream_change_cache = StreamChangeCache( + "TypingStreamChangeCache", self._latest_room_serial, + ) + self.clock.looping_call( self._handle_timeouts, 5000, @@ -275,19 +281,29 @@ class TypingHandler(object): self._latest_room_serial += 1 self._room_serials[member.room_id] = self._latest_room_serial + self._typing_stream_change_cache.entity_has_changed( + member.room_id, self._latest_room_serial, + ) self.notifier.on_new_event( "typing_key", self._latest_room_serial, rooms=[member.room_id] ) def get_all_typing_updates(self, last_id, current_id): - # TODO: Work out a way to do this without scanning the entire state. if last_id == current_id: return [] + changed_rooms = self._typing_stream_change_cache.get_all_entities_changed( + last_id, + ) + + if changed_rooms is None: + changed_rooms = self._room_serials + rows = [] - for room_id, serial in self._room_serials.items(): - if last_id < serial and serial <= current_id: + for room_id in changed_rooms: + serial = self._room_serials[room_id] + if last_id < serial <= current_id: typing = self._room_typing[room_id] rows.append((serial, room_id, list(typing))) rows.sort() diff --git a/synapse/notifier.py b/synapse/notifier.py index f1d92c1395..340b16ce25 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -24,9 +24,10 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import StreamToken from synapse.util.async_helpers import ObservableDeferred, timeout_deferred -from synapse.util.logcontext import PreserveLoggingContext, run_in_background +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -248,7 +249,10 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - run_in_background(self._notify_app_services, room_stream_id) + run_as_background_process( + "notify_app_services", + self._notify_app_services, room_stream_id, + ) if self.federation_sender: self.federation_sender.notify_new_events(room_stream_id) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index c779f69fa0..0f339a0320 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -33,31 +33,32 @@ logger = logging.getLogger(__name__) # [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies REQUIREMENTS = { "jsonschema>=2.5.1": ["jsonschema>=2.5.1"], - "frozendict>=0.4": ["frozendict"], + "frozendict>=1": ["frozendict"], "unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"], "canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"], "signedjson>=1.0.0": ["signedjson>=1.0.0"], "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"], - "service_identity>=1.0.0": ["service_identity>=1.0.0"], + "service_identity>=16.0.0": ["service_identity>=16.0.0"], "Twisted>=17.1.0": ["twisted>=17.1.0"], "treq>=15.1": ["treq>=15.1"], # Twisted has required pyopenssl 16.0 since about Twisted 16.6. "pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"], - "pyyaml": ["yaml"], - "pyasn1": ["pyasn1"], - "daemonize": ["daemonize"], - "bcrypt": ["bcrypt>=3.1.0"], - "pillow": ["PIL"], - "pydenticon": ["pydenticon"], - "sortedcontainers": ["sortedcontainers"], - "pysaml2>=3.0.0": ["saml2>=3.0.0"], - "pymacaroons-pynacl": ["pymacaroons"], + "pyyaml>=3.11": ["yaml"], + "pyasn1>=0.1.9": ["pyasn1"], + "pyasn1-modules>=0.0.7": ["pyasn1_modules"], + "daemonize>=2.3.1": ["daemonize"], + "bcrypt>=3.1.0": ["bcrypt>=3.1.0"], + "pillow>=3.1.2": ["PIL"], + "pydenticon>=0.2": ["pydenticon"], + "sortedcontainers>=1.4.4": ["sortedcontainers"], + "pysaml2>=3.0.0": ["saml2"], + "pymacaroons-pynacl>=0.9.3": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], - "six": ["six"], - "prometheus_client": ["prometheus_client"], + "six>=1.10": ["six"], + "prometheus_client>=0.0.18": ["prometheus_client"], # we use attr.s(slots), which arrived in 16.0.0 "attrs>=16.0.0": ["attr>=16.0.0"], diff --git a/synapse/state/v1.py b/synapse/state/v1.py index c95477d318..7a7157b352 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py @@ -65,10 +65,15 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): for event_ids in itervalues(conflicted_state) for event_id in event_ids ) + needed_event_count = len(needed_events) if event_map is not None: needed_events -= set(iterkeys(event_map)) - logger.info("Asking for %d conflicted events", len(needed_events)) + logger.info( + "Asking for %d/%d conflicted events", + len(needed_events), + needed_event_count, + ) # dict[str, FrozenEvent]: a map from state event id to event. Only includes # the state events which are in conflict (and those in event_map) @@ -85,11 +90,16 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): ) new_needed_events = set(itervalues(auth_events)) + new_needed_event_count = len(new_needed_events) new_needed_events -= needed_events if event_map is not None: new_needed_events -= set(iterkeys(event_map)) - logger.info("Asking for %d auth events", len(new_needed_events)) + logger.info( + "Asking for %d/%d auth events", + len(new_needed_events), + new_needed_event_count, + ) state_map_new = yield state_map_factory(new_needed_events) state_map.update(state_map_new) diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index 59580949f1..0fe8c8e24c 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -172,6 +172,10 @@ class MonthlyActiveUsersStore(SQLBaseStore): Deferred[bool]: True if a new entry was created, False if an existing one was updated. """ + # Am consciously deciding to lock the table on the basis that is ought + # never be a big table and alternative approaches (batching multiple + # upserts into a single txn) introduced a lot of extra complexity. + # See https://github.com/matrix-org/synapse/issues/3854 for more is_insert = yield self._simple_upsert( desc="upsert_monthly_active_user", table="monthly_active_users", @@ -181,7 +185,6 @@ class MonthlyActiveUsersStore(SQLBaseStore): values={ "timestamp": int(self._clock.time_msec()), }, - lock=False, ) if is_insert: self.user_last_seen_monthly_active.invalidate((user_id,)) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 4b971efdba..3f4cbd61c4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ) @defer.inlineCallbacks - def get_state_groups_ids(self, room_id, event_ids): + def get_state_groups_ids(self, _room_id, event_ids): + """Get the event IDs of all the state for the state groups for the given events + + Args: + _room_id (str): id of the room for these events + event_ids (iterable[str]): ids of the events + + Returns: + Deferred[dict[int, dict[tuple[str, str], str]]]: + dict of state_group_id -> (dict of (type, state_key) -> event id) + """ if not event_ids: defer.returnValue({}) @@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_state_ids_for_group(self, state_group): - """Get the state IDs for the given state group + """Get the event IDs of all the state in the given state group Args: state_group (int) @@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): def get_state_groups(self, room_id, event_ids): """ Get the state groups for the given list of event_ids - The return value is a dict mapping group names to lists of events. + Returns: + Deferred[dict[int, list[EventBase]]]: + dict of state_group_id -> list of state events. """ if not event_ids: defer.returnValue({}) @@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): member events (if True), or to exclude member events (if False) Returns: - dictionary state_group -> (dict of (type, state_key) -> event id) + Returns: + Deferred[dict[int, dict[tuple[str, str], str]]]: + dict of state_group_id -> (dict of (type, state_key) -> event id) """ results = {} @@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): If None, `types` filtering is applied to all events. Returns: - Deferred[dict[int, dict[(type, state_key), EventBase]]] - a dictionary mapping from state group to state dictionary. + Deferred[dict[int, dict[tuple[str, str], str]]]: + dict of state_group_id -> (dict of (type, state_key) -> event id) """ if types is not None: non_member_types = [t for t in types if t[0] != EventTypes.Member] @@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): If None, `types` filtering is applied to all events. Returns: - Deferred[dict[int, dict[(type, state_key), EventBase]]] - a dictionary mapping from state group to state dictionary. + Deferred[dict[int, dict[tuple[str, str], str]]]: + dict of state_group_id -> (dict of (type, state_key) -> event id) """ if types: types = frozenset(types) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index a0c2d37610..89224b26cc 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -200,7 +200,7 @@ class LoggingContext(object): sentinel = Sentinel() - def __init__(self, name=None, parent_context=None): + def __init__(self, name=None, parent_context=None, request=None): self.previous_context = LoggingContext.current_context() self.name = name @@ -218,6 +218,13 @@ class LoggingContext(object): self.parent_context = parent_context + if self.parent_context is not None: + self.parent_context.copy_to(self) + + if request is not None: + # the request param overrides the request from the parent context + self.request = request + def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -256,9 +263,6 @@ class LoggingContext(object): ) self.alive = True - if self.parent_context is not None: - self.parent_context.copy_to(self) - return self def __exit__(self, type, value, traceback): @@ -439,6 +443,35 @@ class PreserveLoggingContext(object): ) +def nested_logging_context(suffix, parent_context=None): + """Creates a new logging context as a child of another. + + The nested logging context will have a 'request' made up of the parent context's + request, plus the given suffix. + + CPU/db usage stats will be added to the parent context's on exit. + + Normal usage looks like: + + with nested_logging_context(suffix): + # ... do stuff + + Args: + suffix (str): suffix to add to the parent context's 'request'. + parent_context (LoggingContext|None): parent context. Will use the current context + if None. + + Returns: + LoggingContext: new logging context. + """ + if parent_context is None: + parent_context = LoggingContext.current_context() + return LoggingContext( + parent_context=parent_context, + request=parent_context.request + "-" + suffix, + ) + + def preserve_fn(f): """Function decorator which wraps the function with run_in_background""" def g(*args, **kwargs): diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index b910965932..b9c5b39d59 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -75,6 +75,45 @@ class StateStoreTestCase(tests.unittest.TestCase): self.assertEqual(len(s1), len(s2)) @defer.inlineCallbacks + def test_get_state_groups_ids(self): + e1 = yield self.inject_state_event( + self.room, self.u_alice, EventTypes.Create, '', {} + ) + e2 = yield self.inject_state_event( + self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"} + ) + + state_group_map = yield self.store.get_state_groups_ids(self.room, [e2.event_id]) + self.assertEqual(len(state_group_map), 1) + state_map = list(state_group_map.values())[0] + self.assertDictEqual( + state_map, + { + (EventTypes.Create, ''): e1.event_id, + (EventTypes.Name, ''): e2.event_id, + }, + ) + + @defer.inlineCallbacks + def test_get_state_groups(self): + e1 = yield self.inject_state_event( + self.room, self.u_alice, EventTypes.Create, '', {} + ) + e2 = yield self.inject_state_event( + self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"} + ) + + state_group_map = yield self.store.get_state_groups( + self.room, [e2.event_id]) + self.assertEqual(len(state_group_map), 1) + state_list = list(state_group_map.values())[0] + + self.assertEqual( + {ev.event_id for ev in state_list}, + {e1.event_id, e2.event_id}, + ) + + @defer.inlineCallbacks def test_get_state_for_event(self): # this defaults to a linear DAG as each new injection defaults to whatever diff --git a/tests/test_federation.py b/tests/test_federation.py index 2540604fcc..ff55c7a627 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -6,6 +6,7 @@ from twisted.internet.defer import maybeDeferred, succeed from synapse.events import FrozenEvent from synapse.types import Requester, UserID from synapse.util import Clock +from synapse.util.logcontext import LoggingContext from tests import unittest from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver @@ -117,9 +118,10 @@ class MessageAcceptTests(unittest.TestCase): } ) - d = self.handler.on_receive_pdu( - "test.serv", lying_event, sent_to_us_directly=True - ) + with LoggingContext(request="lying_event"): + d = self.handler.on_receive_pdu( + "test.serv", lying_event, sent_to_us_directly=True + ) # Step the reactor, so the database fetches come back self.reactor.advance(1) @@ -209,11 +211,12 @@ class MessageAcceptTests(unittest.TestCase): } ) - d = self.handler.on_receive_pdu( - "test.serv", good_event, sent_to_us_directly=True - ) - self.reactor.advance(1) - self.assertEqual(self.successResultOf(d), None) + with LoggingContext(request="good_event"): + d = self.handler.on_receive_pdu( + "test.serv", good_event, sent_to_us_directly=True + ) + self.reactor.advance(1) + self.assertEqual(self.successResultOf(d), None) bad_event = FrozenEvent( { @@ -230,10 +233,11 @@ class MessageAcceptTests(unittest.TestCase): } ) - d = self.handler.on_receive_pdu( - "test.serv", bad_event, sent_to_us_directly=True - ) - self.reactor.advance(1) + with LoggingContext(request="bad_event"): + d = self.handler.on_receive_pdu( + "test.serv", bad_event, sent_to_us_directly=True + ) + self.reactor.advance(1) extrem = maybeDeferred( self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id diff --git a/tests/unittest.py b/tests/unittest.py index ef905e6389..043710afaf 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -121,7 +121,7 @@ class TestCase(unittest.TestCase): try: self.assertEquals(attrs[key], getattr(obj, key)) except AssertionError as e: - raise (type(e))(e.message + " for '.%s'" % key) + raise (type(e))(str(e) + " for '.%s'" % key) def assert_dict(self, required, actual): """Does a partial assert of a dict. diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 4633db77b3..8adaee3c8d 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -159,6 +159,11 @@ class LoggingContextTestCase(unittest.TestCase): self.assertEqual(r, "bum") self._check_test_key("one") + def test_nested_logging_context(self): + with LoggingContext(request="foo"): + nested_context = logcontext.nested_logging_context(suffix="bar") + self.assertEqual(nested_context.request, "foo-bar") + # a function which returns a deferred which has been "called", but # which had a function which returned another incomplete deferred on diff --git a/tox.ini b/tox.ini index e4db563b4b..87b5e4782d 100644 --- a/tox.ini +++ b/tox.ini @@ -64,6 +64,26 @@ setenv = {[base]setenv} SYNAPSE_POSTGRES = 1 +# A test suite for the oldest supported versions of Python libraries, to catch +# any uses of APIs not available in them. +[testenv:py27-old] +skip_install=True +deps = + # Old automat version for Twisted + Automat == 0.3.0 + + mock + lxml +commands = + /usr/bin/find "{toxinidir}" -name '*.pyc' -delete + # Make all greater-thans equals so we test the oldest version of our direct + # dependencies, but make the pyopenssl 17.0, which can work against an + # OpenSSL 1.1 compiled cryptography (as older ones don't compile on Travis). + /bin/sh -c 'python -m synapse.python_dependencies | sed -e "s/>=/==/g" -e "s/psycopg2==2.6//" -e "s/pyopenssl==16.0.0/pyopenssl==17.0.0/" | xargs pip install' + # Install Synapse itself. This won't update any libraries. + pip install -e . + {envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:} + [testenv:py35] usedevelop=true |