Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

Basically I want to deploy a Flink custom JAR file to a new AWS EMR cluster. Here is a summary of what I did. I created a new AWS EMR cluster.

  • Step1:Software and steps changes -

  • Created a AWS EMR cluster with flink as the service. (EMR release version - 5.17.0) and clicked Flink 1.5.2 as the software configuration.
  • Entered the Configuration JSON:-

    "Classification": "flink-conf", "Properties": { "jobmanager.heap.mb": "3072", "taskmanager.heap.mb": "51200", "taskmanager.numberOfTaskSlots":"2", "taskmanager.memory.preallocate": "false",
    "parallelism.default": "1"
  • Step2:Hardware - No change in the hardware configuration.By default we have 1 master, 2 core and 0 Task instances. All are m3.xlarge type.

  • Step3:General Cluster Settings - No change here.
  • Step4:Security - Provided my EC2 key pair.
  • Once the cluster creation is ready I SSHed to the EC2 machine and tried to deploy the custom jar file. Below are the different errors I got everytime tried to deploy it via the CLI.

    flink run -m yarn-cluster -yn 2 -c com.deepak.flink.examples.WordCount flink-examples-assembly-1.0.jar

    Using the result of 'hadoop classpath' to augment the Hadoop classpath: /etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*::/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    2018-10-09 06:30:36,766 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at ip-IPADDRESS.ec2.internal/IPADDRESS:8032
    2018-10-09 06:30:36,909 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
    2018-10-09 06:30:37,168 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Killing YARN application
      

    flink run -c com.deepak.flink.examples.WordCount flink-examples-assembly-1.0.jar

    Using the result of 'hadoop classpath' to augment the Hadoop classpath: /etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*::/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    ------------------------------------------------------------
     The program finished with the following exception:
    org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve standalone cluster
        at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:51)
        at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:31)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:253)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
    Caused by: org.apache.flink.util.ConfigurationException: Config parameter 'Key: 'jobmanager.rpc.address' , default: null (deprecated keys: [])' is missing (hostname/address of JobManager to connect to).
        at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getJobManagerAddress(HighAvailabilityServicesUtils.java:141)
        at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:81)
        at org.apache.flink.client.program.ClusterClient.<init>(ClusterClient.java:158)
        at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:183)
        at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:156)
        at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49)
        ... 10 more
    

    Even I tried to deploy via the AWS Web UI, there also the jar failed to deploy.

    So, Basically I want to deploy the custom JAR to the flink YARN Cluster. I am not sure what I am missing for the YARN flink configuration or anything else. Thanks for any help in advance.

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.