From c14593a5ccd7896b041421ee4d3e433a7afc34ac Mon Sep 17 00:00:00 2001 From: fangyue1 Date: Fri, 30 Apr 2021 13:10:25 +0800 Subject: [PATCH] [FLINK-14393][webui] Add an option to enable/disable cancel job in web ui This closes #15817. --- docs/content.zh/docs/deployment/config.md | 1 + docs/content/docs/deployment/config.md | 1 + .../generated/rest_v1_dispatcher.html | 3 +++ .../generated/web_configuration.html | 6 +++++ .../flink/configuration/WebOptions.java | 8 ++++++ flink-dist/src/main/resources/flink-conf.yaml | 5 ++++ .../webmonitor/history/HistoryServer.java | 2 +- .../src/test/resources/rest_api_v1.snapshot | 3 +++ .../src/app/interfaces/configuration.ts | 1 + .../job/status/job-status.component.html | 2 +- .../pages/job/status/job-status.component.ts | 6 +++-- .../handler/RestHandlerConfiguration.java | 14 ++++++++-- .../cluster/DashboardConfigHandler.java | 6 +++-- .../rest/messages/DashboardConfiguration.java | 27 +++++++++++++++---- .../webmonitor/WebMonitorEndpoint.java | 3 ++- .../handler/RestHandlerConfigurationTest.java | 19 +++++++++++++ .../messages/DashboardConfigurationTest.java | 7 ++++- 17 files changed, 99 insertions(+), 15 deletions(-) diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index 92b5903fc9387..9b7df0dc0eba2 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -80,6 +80,7 @@ You can configure checkpointing directly in code within your Flink job or applic **Web UI** - `web.submit.enable`: Enables uploading and starting jobs through the Flink UI *(true by default)*. Please note that even when this is disabled, session clusters still accept jobs through REST requests (HTTP calls). This flag only guards the feature to upload jobs in the UI. + - `web.cancel.enable`: Enables canceling jobs through the Flink UI *(true by default)*. Please note that even when this is disabled, session clusters still cancel jobs through REST requests (HTTP calls). This flag only guards the feature to cancel jobs in the UI. - `web.upload.dir`: The directory where to store uploaded jobs. Only used when `web.submit.enable` is true. **Other** diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md index 3310a15021767..bb9f1f1a34a43 100644 --- a/docs/content/docs/deployment/config.md +++ b/docs/content/docs/deployment/config.md @@ -80,6 +80,7 @@ You can configure checkpointing directly in code within your Flink job or applic **Web UI** - `web.submit.enable`: Enables uploading and starting jobs through the Flink UI *(true by default)*. Please note that even when this is disabled, session clusters still accept jobs through REST requests (HTTP calls). This flag only guards the feature to upload jobs in the UI. + - `web.cancel.enable`: Enables canceling jobs through the Flink UI *(true by default)*. Please note that even when this is disabled, session clusters still cancel jobs through REST requests (HTTP calls). This flag only guards the feature to cancel jobs in the UI. - `web.upload.dir`: The directory where to store uploaded jobs. Only used when `web.submit.enable` is true. - `web.exception-history-size`: Sets the size of the exception history that prints the most recent failures that were handled by Flink for a job. diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index fca22125d0e8d..c69604ab9b228 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -103,6 +103,9 @@ "properties" : { "web-submit" : { "type" : "boolean" + }, + "web-cancel" : { + "type" : "boolean" } } }, diff --git a/docs/layouts/shortcodes/generated/web_configuration.html b/docs/layouts/shortcodes/generated/web_configuration.html index 145d10088e5cd..7562a5e4b1888 100644 --- a/docs/layouts/shortcodes/generated/web_configuration.html +++ b/docs/layouts/shortcodes/generated/web_configuration.html @@ -14,6 +14,12 @@ String Access-Control-Allow-Origin header for all responses from the web-frontend. + +
web.cancel.enable
+ true + Boolean + Flag indicating whether jobs can be canceled from the web-frontend. +
web.checkpoints.history
10 diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java index a7a6d0ea0f590..ab4bcb6fc9be0 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java @@ -117,6 +117,14 @@ public class WebOptions { .withDescription( "Flag indicating whether jobs can be uploaded and run from the web-frontend."); + /** Config parameter indicating whether jobs can be cancel from the web-frontend. */ + public static final ConfigOption CANCEL_ENABLE = + key("web.cancel.enable") + .booleanType() + .defaultValue(true) + .withDescription( + "Flag indicating whether jobs can be canceled from the web-frontend."); + /** Config parameter defining the number of checkpoints to remember for recent history. */ public static final ConfigOption CHECKPOINTS_HISTORY_SIZE = key("web.checkpoints.history") diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml index 39885ac5fa78d..419f08177a760 100644 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ b/flink-dist/src/main/resources/flink-conf.yaml @@ -160,6 +160,11 @@ jobmanager.execution.failover-strategy: region #web.submit.enable: false +# Flag to specify whether job cancellation is enabled from the web-based +# runtime monitor. Uncomment to disable. + +#web.cancel.enable: false + #============================================================================== # Advanced #============================================================================== diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 11202b0d3755f..384dda02430c8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -327,7 +327,7 @@ private void createDashboardConfigFile() throws IOException { fw.write( createConfigJson( DashboardConfiguration.from( - webRefreshIntervalMillis, ZonedDateTime.now(), false))); + webRefreshIntervalMillis, ZonedDateTime.now(), false, false))); fw.flush(); } catch (IOException ioe) { LOG.error("Failed to write config file."); diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index bf1f19053bb01..82ffba924d4bf 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -55,6 +55,9 @@ "properties" : { "web-submit" : { "type" : "boolean" + }, + "web-cancel" : { + "type" : "boolean" } } } diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts index a7eb3129ede6f..dbcb70ab98ec2 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts @@ -24,5 +24,6 @@ export interface ConfigurationInterface { 'flink-revision': string; features: { 'web-submit': boolean; + 'web-cancel': boolean; }; } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/status/job-status.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/status/job-status.component.html index 9a9d9b75834f7..9be07c999f85c 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/status/job-status.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/status/job-status.component.html @@ -51,7 +51,7 @@

{{ jobDetail.name }}

{{ statusTips }} - Cancel Job + Cancel Job
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/status/job-status.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/status/job-status.component.ts index 43060356d7d3e..507430e05a1e7 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/status/job-status.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/status/job-status.component.ts @@ -20,7 +20,7 @@ import { ChangeDetectionStrategy, ChangeDetectorRef, Component, Input, OnDestroy import { JobDetailCorrectInterface } from 'interfaces'; import { Subject } from 'rxjs'; import { distinctUntilKeyChanged, takeUntil } from 'rxjs/operators'; -import { JobService } from 'services'; +import {JobService, StatusService} from 'services'; @Component({ selector: 'flink-job-status', @@ -56,6 +56,8 @@ export class JobStatusComponent implements OnInit, OnDestroy { } ]; + webCancelEnabled = this.statusService.configuration.features["web-cancel"]; + cancelJob() { this.jobService.cancelJob(this.jobDetail.jid).subscribe(() => { this.statusTips = 'Cancelling...'; @@ -63,7 +65,7 @@ export class JobStatusComponent implements OnInit, OnDestroy { }); } - constructor(private jobService: JobService, private cdr: ChangeDetectorRef) {} + constructor(private jobService: JobService, public statusService: StatusService, private cdr: ChangeDetectorRef) {} ngOnInit() { const jobDetail$ = this.jobService.jobDetail$.pipe(takeUntil(this.destroy$)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java index 336fcefa2e4df..c163d8020082a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java @@ -38,12 +38,15 @@ public class RestHandlerConfiguration { private final boolean webSubmitEnabled; + private final boolean webCancelEnabled; + public RestHandlerConfiguration( long refreshInterval, int maxCheckpointStatisticCacheEntries, Time timeout, File webUiDir, - boolean webSubmitEnabled) { + boolean webSubmitEnabled, + boolean webCancelEnabled) { Preconditions.checkArgument( refreshInterval > 0L, "The refresh interval (ms) should be larger than 0."); this.refreshInterval = refreshInterval; @@ -53,6 +56,7 @@ public RestHandlerConfiguration( this.timeout = Preconditions.checkNotNull(timeout); this.webUiDir = Preconditions.checkNotNull(webUiDir); this.webSubmitEnabled = webSubmitEnabled; + this.webCancelEnabled = webCancelEnabled; } public long getRefreshInterval() { @@ -75,6 +79,10 @@ public boolean isWebSubmitEnabled() { return webSubmitEnabled; } + public boolean isWebCancelEnabled() { + return webCancelEnabled; + } + public static RestHandlerConfiguration fromConfiguration(Configuration configuration) { final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL); @@ -87,12 +95,14 @@ public static RestHandlerConfiguration fromConfiguration(Configuration configura final File webUiDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir); final boolean webSubmitEnabled = configuration.getBoolean(WebOptions.SUBMIT_ENABLE); + final boolean webCancelEnabled = configuration.getBoolean(WebOptions.CANCEL_ENABLE); return new RestHandlerConfiguration( refreshInterval, maxCheckpointStatisticCacheEntries, timeout, webUiDir, - webSubmitEnabled); + webSubmitEnabled, + webCancelEnabled); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java index 212666c0a717f..284db86d81835 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java @@ -48,11 +48,13 @@ public DashboardConfigHandler( MessageHeaders messageHeaders, long refreshInterval, - boolean webSubmitEnabled) { + boolean webSubmitEnabled, + boolean webCancelEnabled) { super(leaderRetriever, timeout, responseHeaders, messageHeaders); dashboardConfiguration = - DashboardConfiguration.from(refreshInterval, ZonedDateTime.now(), webSubmitEnabled); + DashboardConfiguration.from( + refreshInterval, ZonedDateTime.now(), webSubmitEnabled, webCancelEnabled); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java index 0cfd0a05f970b..b124756cc0ed7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java @@ -46,6 +46,8 @@ public class DashboardConfiguration implements ResponseBody { public static final String FIELD_NAME_FEATURE_WEB_SUBMIT = "web-submit"; + public static final String FIELD_NAME_FEATURE_WEB_CANCEL = "web-cancel"; + @JsonProperty(FIELD_NAME_REFRESH_INTERVAL) private final long refreshInterval; @@ -116,9 +118,15 @@ public static final class Features { @JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) private final boolean webSubmitEnabled; + @JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL) + private final boolean webCancelEnabled; + @JsonCreator - public Features(@JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) boolean webSubmitEnabled) { + public Features( + @JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) boolean webSubmitEnabled, + @JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL) boolean webCancelEnabled) { this.webSubmitEnabled = webSubmitEnabled; + this.webCancelEnabled = webCancelEnabled; } @JsonIgnore @@ -126,6 +134,11 @@ public boolean isWebSubmitEnabled() { return webSubmitEnabled; } + @JsonIgnore + public boolean isWebCancelEnabled() { + return webCancelEnabled; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -135,12 +148,13 @@ public boolean equals(Object o) { return false; } Features features = (Features) o; - return webSubmitEnabled == features.webSubmitEnabled; + return webSubmitEnabled == features.webSubmitEnabled + && webCancelEnabled == features.webCancelEnabled; } @Override public int hashCode() { - return Objects.hash(webSubmitEnabled); + return Objects.hash(webSubmitEnabled, webCancelEnabled); } } @@ -173,7 +187,10 @@ public int hashCode() { } public static DashboardConfiguration from( - long refreshInterval, ZonedDateTime zonedDateTime, boolean webSubmitEnabled) { + long refreshInterval, + ZonedDateTime zonedDateTime, + boolean webSubmitEnabled, + boolean webCancelEnabled) { final String flinkVersion = EnvironmentInformation.getVersion(); @@ -195,6 +212,6 @@ public static DashboardConfiguration from( zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() * 1000, flinkVersion, flinkRevision, - new Features(webSubmitEnabled)); + new Features(webSubmitEnabled, webCancelEnabled)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 6794448c968b7..8bcf0cf088159 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -285,7 +285,8 @@ protected List> initiali responseHeaders, DashboardConfigurationHeaders.getInstance(), restConfiguration.getRefreshInterval(), - hasWebSubmissionHandlers); + hasWebSubmissionHandlers, + restConfiguration.isWebCancelEnabled()); JobIdsHandler jobIdsHandler = new JobIdsHandler( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java index 8caa04b9b459c..43126e5a0d26f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java @@ -39,6 +39,16 @@ public void testWebSubmitFeatureFlagDisabled() { testWebSubmitFeatureFlag(false); } + @Test + public void testWebCancelFeatureFlagEnabled() { + testWebCancelFeatureFlag(true); + } + + @Test + public void testWebCancelFeatureFlagDisabled() { + testWebCancelFeatureFlag(false); + } + private static void testWebSubmitFeatureFlag(boolean webSubmitEnabled) { final Configuration config = new Configuration(); config.setBoolean(WebOptions.SUBMIT_ENABLE, webSubmitEnabled); @@ -47,4 +57,13 @@ private static void testWebSubmitFeatureFlag(boolean webSubmitEnabled) { RestHandlerConfiguration.fromConfiguration(config); assertEquals(webSubmitEnabled, restHandlerConfiguration.isWebSubmitEnabled()); } + + private static void testWebCancelFeatureFlag(boolean webCancelEnabled) { + final Configuration config = new Configuration(); + config.setBoolean(WebOptions.CANCEL_ENABLE, webCancelEnabled); + + RestHandlerConfiguration restHandlerConfiguration = + RestHandlerConfiguration.fromConfiguration(config); + assertEquals(webCancelEnabled, restHandlerConfiguration.isWebCancelEnabled()); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java index 68c234e35c2bb..cdfc3a43a27c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java @@ -30,6 +30,11 @@ protected Class getTestResponseClass() { @Override protected DashboardConfiguration getTestResponseInstance() { return new DashboardConfiguration( - 1L, "foobar", 42, "version", "revision", new DashboardConfiguration.Features(true)); + 1L, + "foobar", + 42, + "version", + "revision", + new DashboardConfiguration.Features(true, true)); } }