How to use the Livy Spark REST Job Server API for submitting batch jar, Python and Streaming Jobs

How to use the Livy Spark REST Job Server API for submitting batch jar, Python and Streaming Jobs

Livy is an open source REST interface for interacting with Spark from anywhere. It supports executing snippets of code or programs in a Spark Context that runs locally or in YARN.

Note: Livy is not supported in CDH, only in the upstream Hue community.

 

We previously detailed how to use the interactive Shell API (aka spark shells) and how to create remote shared RDDs. In this follow-up we will see how to execute batch jobs (aka spark-submit) in YARN. These jobs can be Java or Scala compiled into a jar or just Python files. Some advantages of using Livy is that jobs can be submitted remotely and don’t need to implement any special interface or be re-compiled. 

livy-batch-archi

Livy wraps spark-submit and executes it remotely

Starting the REST server

This is described in the previous post section.

We are using the YARN mode here, so all the paths needs to exist on HDFS. For local dev mode, just use local paths on your machine.

 

Submitting a Jar

Livy offers a wrapper around spark-submit that work with jar and py files. The API is slightly different than the interactive. Let’s start by listing the active running jobs:

curl localhost:8998/sessions | python -m json.tool  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    34    0    34    0     0   2314      0 --:--:-- --:--:-- --:--:--  2428
{
    "from": 0,
    "sessions": [],
    "total": 0
}

Then we upload the Spark example jar /usr/lib/spark/lib/spark-examples.jar on HDFS and point to it. If you are using Livy in local mode and not YARN mode, just keep the local path /usr/lib/spark/lib/spark-examples.jar.

curl -X POST --data '{"file": "/user/romain/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi"}' -H "Content-Type: application/json" localhost:8998/batches
{"id":0,"state":"running","log":[]}

We get the submission id, in our case 0, and can check its progress. It should actually already be done:

curl localhost:8998/batches/0 | python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   902    0   902    0     0  91120      0 --:--:-- --:--:-- --:--:--   97k
{
    "id": 0,
    "log": [
        "15/10/20 16:32:21 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.1.30:4040",
        "15/10/20 16:32:21 INFO scheduler.DAGScheduler: Stopping DAGScheduler",
        "15/10/20 16:32:21 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!",
        "15/10/20 16:32:21 INFO storage.MemoryStore: MemoryStore cleared",
        "15/10/20 16:32:21 INFO storage.BlockManager: BlockManager stopped",
        "15/10/20 16:32:21 INFO storage.BlockManagerMaster: BlockManagerMaster stopped",
        "15/10/20 16:32:21 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!",
        "15/10/20 16:32:21 INFO spark.SparkContext: Successfully stopped SparkContext",
        "15/10/20 16:32:21 INFO util.ShutdownHookManager: Shutdown hook called",
        "15/10/20 16:32:21 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-6e362908-465a-4c67-baa1-3dcf2d91449c"
    ],
    "state": "success"
}

We can see the output logs:

curl localhost:8998/batches/0/log |  python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  5378    0  5378    0     0   570k      0 --:--:-- --:--:-- --:--:--  583k
{
    "from": 0,
    "id": 3,
    "log": [
        "SLF4J: Class path contains multiple SLF4J bindings.",
        "SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]",
        "SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]",
        "SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.",
        "SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]",
        "15/10/21 01:37:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable",
        "15/10/21 01:37:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032",
        "15/10/21 01:37:27 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers",
        "15/10/21 01:37:27 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)",
        "15/10/21 01:37:27 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead",
        "15/10/21 01:37:27 INFO yarn.Client: Setting up container launch context for our AM",
        "15/10/21 01:37:27 INFO yarn.Client: Setting up the launch environment for our AM container",
        "15/10/21 01:37:27 INFO yarn.Client: Preparing resources for our AM container",
        ....
        ....
        "15/10/21 01:37:40 INFO yarn.Client: Application report for application_1444917524249_0004 (state: RUNNING)",
        "15/10/21 01:37:41 INFO yarn.Client: Application report for application_1444917524249_0004 (state: RUNNING)",
        "15/10/21 01:37:42 INFO yarn.Client: Application report for application_1444917524249_0004 (state: FINISHED)",
        "15/10/21 01:37:42 INFO yarn.Client: ",
        "\t client token: N/A",
        "\t diagnostics: N/A",
        "\t ApplicationMaster host: 192.168.1.30",
        "\t ApplicationMaster RPC port: 0",
        "\t queue: root.romain",
        "\t start time: 1445416649481",
        "\t final status: SUCCEEDED",
        "\t tracking URL: http://unreal:8088/proxy/application_1444917524249_0004/A",
        "\t user: romain",
        "15/10/21 01:37:42 INFO util.ShutdownHookManager: Shutdown hook called",
        "15/10/21 01:37:42 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-26cdc4d9-071e-4420-a2f9-308a61af592c"
    ],
    "total": 67
}

We can add an argument to the command, for example do 100 iterations that way the result is more precise and will run longer:

curl -X POST --data '{"file": "/usr/lib/spark/lib/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi", "args": ["100"]}' -H "Content-Type: application/json" localhost:8998/batches
{"id":1,"state":"running","log":[]}

In case we want to stop the running job, we just issue:

curl -X DELETE localhost:8998/batches/1
{"msg":"deleted"}

Doing it another time will return nothing as the job was removed from Livy:

curl -X DELETE localhost:8998/batches/1
session not found

Submitting a Python job

Submitting Python jobs is almost identical to jar jobs. We uncompress the spark examples and upload pi.py on HDFS:

~/tmp$ tar -zxvf /usr/lib/spark/examples/lib/python.tar.gz
./
./sql.py
./kmeans.py
./cassandra_outputformat.py
./mllib/
./mllib/correlations.py
./mllib/kmeans.py
....
....
./streaming/flume_wordcount.py
./streaming/recoverable_network_wordcount.py
./streaming/hdfs_wordcount.py
./streaming/kafka_wordcount.py
./streaming/stateful_network_wordcount.py
./streaming/sql_network_wordcount.py
./streaming/mqtt_wordcount.py
./streaming/network_wordcount.py
./streaming/direct_kafka_wordcount.py
./wordcount.py
./pi.py
./hbase_inputformat.py

Then start the job:

curl -X POST --data '{"file": "/user/romain/pi.py"}' -H "Content-Type: application/json" localhost:8998/batches
{"id":2,"state":"starting","log":[]}

As always, we can check its status with a simple GET:

curl localhost:8998/batches/2 |  python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   616    0   616    0     0  77552      0 --:--:-- --:--:-- --:--:-- 88000
{
    "id": 2,
    "log": [
        "\t ApplicationMaster host: 192.168.1.30",
        "\t ApplicationMaster RPC port: 0",
        "\t queue: root.romain",
        "\t start time: 1445417899564",
        "\t final status: UNDEFINED",
        "\t tracking URL: http://unreal:8088/proxy/application_1444917524249_0006/",
        "\t user: romain",
        "15/10/21 01:58:26 INFO yarn.Client: Application report for application_1444917524249_0006 (state: RUNNING)",
        "15/10/21 01:58:27 INFO yarn.Client: Application report for application_1444917524249_0006 (state: RUNNING)",
        "15/10/21 01:58:28 INFO yarn.Client: Application report for application_1444917524249_0006 (state: RUNNING)"
    ],
    "state": "running"
}

And the output by adding the /log suffix!

curl localhost:8998/batches/2/log |  python -m json.tool

Submitting a Streaming job

In many cases, Streaming consist in a batch job that we submit. Here is how to submit the Solr Spark streaming jobs that collects live tweets and index them into a Dynamic Search Dashboard.
After we compiling the jar, we upload it on HDFS, and also upload the twitter4j.properties.

curl -X POST --data '{"file": "/user/romain/spark-solr-1.0-SNAPSHOT.jar", "className": "com.lucidworks.spark.SparkApp", "args": ["twitter-to-solr", "-zkHost", "localhost:9983", "-collection", "tweets"], "files": ["/user/romain/twitter4j.properties"]}' -H "Content-Type: application/json" localhost:8998/batches
{"id":3,"state":"starting","log":[]}

We check the status and see that it is running correctly:

curl localhost:8998/batches/3 |  python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   842    0   842    0     0  82947      0 --:--:-- --:--:-- --:--:-- 84200
{
    "id": 3,
    "log": [
        "\t start time: 1445420201439",
        "\t final status: UNDEFINED",
        "\t tracking URL: http://unreal:8088/proxy/application_1444917524249_0009/",
        "\t user: romain",
        "15/10/21 02:36:47 INFO yarn.Client: Application report for application_1444917524249_0009 (state: RUNNING)",
        "15/10/21 02:36:48 INFO yarn.Client: Application report for application_1444917524249_0009 (state: RUNNING)",
        "15/10/21 02:36:49 INFO yarn.Client: Application report for application_1444917524249_0009 (state: RUNNING)",
        "15/10/21 02:36:50 INFO yarn.Client: Application report for application_1444917524249_0009 (state: RUNNING)",
        "15/10/21 02:36:51 INFO yarn.Client: Application report for application_1444917524249_0009 (state: RUNNING)",
        "15/10/21 02:36:52 INFO yarn.Client: Application report for application_1444917524249_0009 (state: RUNNING)"
    ],
    "state": "running"
}

If we open the Dashboard and configure it like in the blog post, we can see the tweets coming:

live-search

At the end, we can just stop the job with:

curl -X DELETE localhost:8998/batches/3

 

You can refer to the Batch API documentation for how to specify additional spark-submit properties. For example to add a custom name or queue:

curl -X POST --data '{"file": "/usr/lib/spark/lib/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi", "queue": "my_queue", "name": "Livy Pi Example"}' -H "Content-Type: application/json" localhost:8998/batches

Next time we will explore magic keywords and how to integrate better with IPython!

If you want to learn more about the Livy Spark REST Api, feel free to send questions on the user list or meet up in person at the upcoming Spark Summit in Amsterdam!

 

17 Comments

  1. Harpreet Chopra 1 year ago

    It looks like each batch has a dedicated session, and probably a different sparkcontext.
    Can batch processes be submitted for an existing session, just like interactive statements?
    We need to share sparkcontexts/RDDs between batch jobs. Is that feasible?

    • Hue Team 1 year ago

      Yes, batch are like a ‘spark-sbumit’ command. To keep the context open, you would need to submit into an interactive session or investigate the Remote Spark Context of the Java API

      • Pritpal Singh 8 months ago

        Does Livy offer functionality like Spark Job Server where instead of running statements interactively, you are running precompiled Jars interactively and sharing SparkConext(Spark session in case os Spark 2.0.x) between execution of each jar. I see that you can execute precompiled Jar as batch, but that is equivalent of single spark-submit. It does not keep the session alive. Please let me know if I’m missing something here. I spent lot of time figuring this out. Please help

        • Author
          Hue Team 7 months ago

          AFAIK you would need to start an interactive session and load you jars there and execute them, batch is indeed like a wrapper around spark-submit. Feel free to follow-up on livy.io

        • Nag Arjun 5 months ago

          Hi Pritpal,

          We are also trying to use the existing session and submit the jobs using Livy. Do you have success on these things. You have also mentioned that if we want to execute pyspark code we have to submit compiled jar files. So is there any way we can submit python code to existing spark session using Livy job submit provider? Please let us know if you have any leads in this.

          Thanks in advance.

  2. Dhiraj 1 year ago

    Hey,
    I am not able to use curl in powershell to launch a spark job on hdinsight cluster.

    curl -k –user “admin:password” -v -H ‘Content-Type: application/json’ -X POST -d ‘{ “file”:”https://storage.blob.core.windows.net/spark-container/sparkjob-jar.jar”, “className”:”io.spark.Driver” }’ “https://cluster-spark.azurehdinsight.net/livy/batches”

    error: Cannot bind parameter ‘Headers’. Cannot convert the “Content-Type: application/json” value of type “System.String” to type “System.Collections.IDictionary”.

    • Hue Team 1 year ago

      We would recommend to ask this question on the HDInsight forum as it is not directly related to Hue!

  3. Carlos 12 months ago

    Hi, I’ trying to submit a Python job but I cannot make it work. If I place the pi.py file in the local file system, (eg. /home/livy/pi.py) when executing the curl command I get:

    curl -X POST –data ‘{“file”: “file:///home/livy/pi.py”}’ -H “Content-Type: application/json” localhost:8998/batches

    “requirement failed: Local path /home/livy/pi.py cannot be added to user sessions.”

    If I put the file in hdfs (hadoop fs -put pi.py hdfs:///) then I Execupt the curl command:

    curl -X POST –data ‘{“file”: “/pi.py”}’ -H “Content-Type: application/json” localhost:8998/batches
    {“id”:7,”state”:”running”,”log”:[]}

    The command executes fine but the batch is in error state:

    curl localhost:8998/batches/7 | python -m json.tool
    {
    “id”: 7,
    “log”: [
    ” spark.yarn.executor.memoryOverhead -> 384″,
    ” spark.yarn.submit.file.replication -> 3″,
    ” spark.yarn.containerLauncherMaxThreads -> 25″,
    ” spark.yarn.driver.memoryOverhead -> 384″,
    ” spark.history.kerberos.keytab -> none”,
    ” spark.eventLog.dir -> hdfs:///spark-history”,
    ” spark.yarn.preserve.staging.files -> false”,
    “”,
    ” .primaryResource”,
    “Run with –help for usage help or –verbose for debug output”
    ],
    “state”: “error”
    }

    and the log is as follows:

    curl localhost:8998/batches/7/log | python -m json.tool
    {
    “from”: 0,
    “id”: 7,
    “log”: [
    “Error: Only local python files are supported: Parsed arguments:”,
    ” master local[*]”,
    ” deployMode client”,
    ” executorMemory null”,
    ” executorCores null”,
    ” totalExecutorCores null”,
    ” propertiesFile /usr/hdp/current/spark-historyserver/conf/spark-defaults.conf”,
    ” driverMemory null”,
    ” driverCores null”,
    ” driverExtraClassPath null”,
    ” driverExtraLibraryPath /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64″,
    ” driverExtraJavaOptions null”,
    ” supervise false”,
    ” queue null”,
    ” numExecutors null”,
    ” files null”,
    ” pyFiles null”,
    ” archives null”,
    ” mainClass null”,
    ” primaryResource hdfs://sandbox.hortonworks.com:8020/pi.py”,
    ” name Livy”,
    ” childArgs []”,
    ” jars null”,
    ” packages null”,
    ” packagesExclusions null”,
    ” repositories null”,
    ” verbose false”,
    “”,
    “Spark properties used, including those specified through”,
    ” –conf and those from the properties file /usr/hdp/current/spark-historyserver/conf/spark-defaults.conf:”,
    ” spark.yarn.queue -> default”,
    ” spark.history.kerberos.principal -> none”,
    ” spark.executor.extraLibraryPath -> /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64″,
    ” spark.yarn.max.executor.failures -> 3″,
    ” spark.driver.extraLibraryPath -> /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64″,
    ” spark.yarn.historyServer.address -> sandbox.hortonworks.com:18080″,
    ” spark.eventLog.enabled -> true”,
    ” spark.history.ui.port -> 18080″,
    ” spark.history.provider -> org.apache.spark.deploy.history.FsHistoryProvider”,
    ” spark.history.fs.logDirectory -> hdfs:///spark-history”,
    ” spark.yarn.scheduler.heartbeat.interval-ms -> 5000″,
    ” spark.yarn.executor.memoryOverhead -> 384″,
    ” spark.yarn.submit.file.replication -> 3″,
    ” spark.yarn.containerLauncherMaxThreads -> 25″,
    ” spark.yarn.driver.memoryOverhead -> 384″,
    ” spark.history.kerberos.keytab -> none”,
    ” spark.eventLog.dir -> hdfs:///spark-history”,
    ” spark.yarn.preserve.staging.files -> false”,
    “”,
    ” .primaryResource”,
    “Run with –help for usage help or –verbose for debug output”
    ],
    “total”: 51
    }

    It seems the it doesn’t let me submit the python file from local filesystem, but when I submit it from hdfs, it says “Error: Only local python files are supported:”. I must be doing something wron but I cannot figure it out what.

    Thanks in advance.

  4. dominic 11 months ago

    fixed this by setting “spark.master” to “yarn-cluster” in spark-defaults.conf

    sample config:

    spark.driver.extraJavaOptions -Dhdp.version=2.3.6.0-3796
    spark.yarn.am.extraJavaOptions -Dhdp.version=2.3.6.0-3796
    spark.master = yarn-cluster

    ..
    .

    found the configuration in https://groups.google.com/a/cloudera.org/forum/#!topic/livy-user/8TNqxt1ObO8

  5. Sneha 6 months ago

    I tried submitting spark job with below command
    curl -X POST –data ‘{“file”:”file:///local/snehh/example.jar”, “className”:”com.data.ExampleSpark”}’ -H “Content-Type:application/json” localhost:8998/batches

    my example.jar is located in my local directory.

    And I get the error as below
    “requirement failed: Local path /local/snehh/AdRunwayScalaOperators-1.0.jar cannot be added to user sessions.”

    Please help me.

    • Sneha 6 months ago

      I also tried submitting after copying application jar to hdfs file system. Then I get following error

      “Warning: Skip remote jar hdfs://abh2hdfs/local/snehh/example.jar.”,”java.lang.ClassNotFoundException:com.data.ExampleSpark”
      But when I submit the spark job directly by spark submit without using livy it works fine. Where am I going wrong?

      I have following in my spark conf
      spark.master yarn-cluster
      spark.submit.deployMode cluster
      spark.driver.memory 4g
      spark.driver.maxResultSize 8g
      spark.executor.memory 8g
      spark.eventLog.enabled false

      • Kiran 4 months ago

        Kindly let me know if you got a solution for the above problem..as i am facing the same issue

    • Doan Viet Dung 5 months ago

      @Sneha

      In the livy.conf file, add livy.file.local-dir-whitelist = /local/snehh/

      if many paths then use , to separate them

  6. Bryan Jacobs 3 months ago

    Could someone explain to me how to do this step:

    “Then we upload the Spark example jar /usr/lib/spark/lib/spark-examples.jar on HDFS and point to it.”

    I’m trying out livy but this step doesn’t make sense to me because I’m not very familiar with HDFS,

    • Author
      Hue Team 3 months ago

      In the input box of the Spark Jar snippet, upload the jar from your local computer on HDFS and then just selected it.

Leave a reply

Your email address will not be published. Required fields are marked *

*