Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Module Name] Not able to run Jar job in k8s application/k8s operator mode #4181

Open
2 of 3 tasks
1984man opened this issue Jan 24, 2025 · 1 comment
Open
2 of 3 tasks
Assignees
Labels
Bug Something isn't working Waiting for reply Waiting for reply

Comments

@1984man
Copy link

1984man commented Jan 24, 2025

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

  1. I can run this jar job with local mode successfully.

  2. I run the jar job with k8s application and k8s operator mode.

  3. I just run flink demo code with the following parameter, and it fails.

Image

My k8s flink image like the following

ARG FLINK_VERSION=1.19.1
FROM flink:${FLINK_VERSION}-scala_2.12-java8

# Create the target directory for Flink lib
RUN mkdir -p /opt/flink/lib

# Remove existing table planner loader jar and move table planner jar to lib
RUN rm -rf /opt/flink/lib/flink-table-planner-loader-*.jar && \
    mv /opt/flink/opt/flink-table-*.jar /opt/flink/lib


# Download and extract Dinky release
RUN curl -L https://github.com/DataLinkDC/dinky/releases/download/v1.2.1/dinky-release-1.19-1.2.1.tar.gz | tar -xz -C /tmp && \
    mv /tmp/dinky-release-1.19-1.2.1/jar/dinky-app-1.19-1.2.1-jar-with-dependencies.jar /opt/flink/lib/ && \
    rm -rf /tmp/dinky-release-1.19-1.2.1

RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop
# Download additional JAR files to /opt/flink/lib
RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.19.1.jar  /opt/flink/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.19.1.jar

Then the error log will be shown in job manager pod:

Errors log are in detail

error.log ckend@31fdccd4 2025-01-24 08:03:27,528 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend 2025-01-24 08:03:27,527 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Socket Stream -> Flat Map (1/1)#0 (6bb97f2ca3d0350ca5662f172da01ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to FAILED with failure cause: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order. at org.apache.flink.streaming.api.graph.StreamConfig.getVertexNonChainedOutputs(StreamConfig.java:586) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1674) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1658) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:423) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:377) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:108) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:104) ~[flink-dist-1.19.1.jar:1.19.1] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_432] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_432] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_432] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_432] at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1615) [flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:737) [flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist-1.19.1.jar:1.19.1] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_432] Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_432] at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_432] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2437) ~[?:1.8.0_432] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[?:1.8.0_432] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[?:1.8.0_432] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[?:1.8.0_432] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[?:1.8.0_432] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[?:1.8.0_432] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[?:1.8.0_432] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[?:1.8.0_432] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[?:1.8.0_432] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[?:1.8.0_432] at java.util.ArrayList.readObject(ArrayList.java:799) ~[?:1.8.0_432] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_432] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_432] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_432] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_432] at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) ~[?:1.8.0_432] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2322) ~[?:1.8.0_432] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[?:1.8.0_432] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[?:1.8.0_432] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[?:1.8.0_432] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[?:1.8.0_432] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:481) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.api.graph.StreamConfig.getVertexNonChainedOutputs(StreamConfig.java:582) ~[flink-dist-1.19.1.jar:1.19.1] ... 14 more 2025-01-24 08:03:27,532 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Socket Stream -> Flat Map (1/1)#0 (6bb97f2ca3d0350ca5662f172da01ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_0). 2025-01-24 08:03:27,534 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@7c42b5cc 2025-01-24 08:03:27,538 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Socket Stream -> Flat Map (1/1)#0 6bb97f2ca3d0350ca5662f172da01ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_0. 2025-01-24 08:03:27,624 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task TumblingProcessingTimeWindows -> Sink: Print to Std. Out (1/1)#0 (6bb97f2ca3d0350ca5662f172da01ed5_90bea66de1c231edf33913ecd54406c1_0_0). 2025-01-24 08:03:27,624 INFO org.apache.flink.runtime.taskmanager.Task [] - TumblingProcessingTimeWindows -> Sink: Print to Std. Out (1/1)#0 (6bb97f2ca3d0350ca5662f172da01ed5_90bea66de1c231edf33913ecd54406c1_0_0) switched from DEPLOYING to CANCELING. 2025-01-24 08:03:27,726 INFO org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver [] - Updating Hadoop configuration 2025-01-24 08:03:27,728 INFO org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver [] - Updated Hadoop configuration successfully 2025-01-24 08:03:27,955 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2025-01-24 08:03:27,972 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2025-01-24 08:03:27,972 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 2025-01-24 08:03:28,037 WARN org.apache.hadoop.util.NativeCodeLoader [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2025-01-24 08:03:29,168 INFO org.apache.flink.runtime.taskmanager.Task [] - TumblingProcessingTimeWindows -> Sink: Print to Std. Out (1/1)#0 (6bb97f2ca3d0350ca5662f172da01ed5_90bea66de1c231edf33913ecd54406c1_0_0) switched from CANCELING to CANCELED. 2025-01-24 08:03:29,168 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for TumblingProcessingTimeWindows -> Sink: Print to Std. Out (1/1)#0 (6bb97f2ca3d0350ca5662f172da01ed5_90bea66de1c231edf33913ecd54406c1_0_0). 2025-01-24 08:03:29,169 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task TumblingProcessingTimeWindows -> Sink: Print to Std. Out (1/1)#0 6bb97f2ca3d0350ca5662f172da01ed5_90bea66de1c231edf33913ecd54406c1_0_0. 2025-01-24 08:03:29,371 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=537.600mb (563714445 bytes), taskOffHeapMemory=0 bytes, managedMemory=634.880mb (665719939 bytes), networkMemory=158.720mb (166429984 bytes)}, allocationId: e8bf533a3b168883373f90f9b4c9ece6, jobId: 52597cb193dc9054f3871cd4434c7c96). 2025-01-24 08:03:29,373 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 52597cb193dc9054f3871cd4434c7c96 from job leader monitoring. 2025-01-24 08:03:29,373 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 52597cb193dc9054f3871cd4434c7c96.

What you expected to happen

this job can be run in local mode. and I also want to run it in k8s mode.
this is a bug that blocks my job submission on k8s.

How to reproduce

job Env:
flink demo code SocketWindowWordCount.jar

K8s Env:
Server Version: v1.27.13

Anything else

No response

Version

1.2.0/1.2.1

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@1984man 1984man added Bug Something isn't working Waiting for reply Waiting for reply labels Jan 24, 2025
Copy link

Hello @1984man, this issue is about K8S, so I assign it to @gaoyan1998 and @zackyoungh. If you have any questions, you can comment and reply.

你好 @1984man, 这个 issue 是关于 K8S 的,所以我把它分配给了 @gaoyan1998@zackyoungh。如有任何问题,可以评论回复。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug Something isn't working Waiting for reply Waiting for reply
Projects
None yet
Development

No branches or pull requests

3 participants