Apache Livy Examples

Spark Example

Here’s a step-by-step example of interacting with Livy in Python with the Requests library. By default Livy runs on port 8998 (which can be changed with the livy.server.port config option). We’ll start off with a Spark session that takes Scala code:

sudo pip install requests
import json, pprint, requests, textwrap
host = 'http://localhost:8998'
data = {'kind': 'spark'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
r.json()

{u'state': u'starting', u'id': 0, u'kind': u'spark'}

Once the session has completed starting up, it transitions to the idle state:

session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)
r.json()

{u'state': u'idle', u'id': 0, u'kind': u'spark'}

Now we can execute Scala by passing in a simple JSON command:

statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()

{u'output': None, u'state': u'running', u'id': 0}

If a statement takes longer than a few milliseconds to execute, Livy returns early and provides a statement URL that can be polled until it is complete:

statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())

{u'id': 0,
  u'output': {u'data': {u'text/plain': u'res0: Int = 2'},
              u'execution_count': 0,
              u'status': u'ok'},
  u'state': u'available'}

That was a pretty simple example. More interesting is using Spark to estimate Pi. This is from the Spark Examples:

data = {
  'code': textwrap.dedent("""
    val NUM_SAMPLES = 100000;
    val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
      val x = Math.random();
      val y = Math.random();
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _);
    println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
    """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())

{u'id': 1,
 u'output': {u'data': {u'text/plain': u'Pi is roughly 3.14004\nNUM_SAMPLES: Int = 100000\ncount: Int = 78501'},
             u'execution_count': 1,
             u'status': u'ok'},
 u'state': u'available'}

Finally, close the session:

session_url = 'http://localhost:8998/sessions/0'
requests.delete(session_url, headers=headers)

<Response [204]>

PySpark Example

PySpark has the same API, just with a different initial request:

data = {'kind': 'pyspark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
r.json()

{u'id': 1, u'state': u'idle'}

The Pi example from before then can be run as:

data = {
  'code': textwrap.dedent("""
    import random
    NUM_SAMPLES = 100000
    def sample(p):
      x, y = random.random(), random.random()
      return 1 if x*x + y*y < 1 else 0

    count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
    print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
    """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

{u'id': 12,
u'output': {u'data': {u'text/plain': u'Pi is roughly 3.136000'},
            u'execution_count': 12,
            u'status': u'ok'},
u'state': u'running'}

SparkR Example

SparkR has the same API:

data = {'kind': 'sparkr'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
r.json()

{u'id': 1, u'state': u'idle'}

The Pi example from before then can be run as:

data = {
  'code': textwrap.dedent("""
    n <- 100000
    piFunc <- function(elem) {
      rands <- runif(n = 2, min = -1, max = 1)
      val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0)
      val
    }
    piFuncVec <- function(elems) {
      message(length(elems))
      rands1 <- runif(n = length(elems), min = -1, max = 1)
      rands2 <- runif(n = length(elems), min = -1, max = 1)
      val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
      sum(val)
    }
    rdd <- parallelize(sc, 1:n, slices)
    count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
    cat("Pi is roughly", 4.0 * count / n, "\n")
    """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

{u'id': 12,
 u'output': {u'data': {u'text/plain': u'Pi is roughly 3.136000'},
             u'execution_count': 12,
             u'status': u'ok'},
 u'state': u'running'}