Spark HTTP Server Requests Aggregations

Soumav Prakash
3 min readJul 11, 2021

In this article , i will be covering two key concepts

  1. Creating your own rest API in scala within 5 minutes
  2. Building the aggregations on the incoming HTTP Requests

To create a rest API , we can use the HttpServer Library which enables us to create a server off a given port and a Base URL . We can also define a handler to enable the API to send a response to the client.

On top of that , we will be enabling a Memory Stream based Spark Streaming application that will be aggregating the metrics on the go.

While the application is on , for every 20 seconds of batch interval the application will calculate the aggregations based upon the data available . I have gone with the Complete Output mode to stream the results to the console.

Sample Postman Request

Please find the source code below

package Streaming

import Streaming.BookingDataConsumer.spark
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SQLContext, SparkSession, functions}
import org.apache.spark.sql.streaming.Trigger
import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, count, from_json}

import java.net.InetSocketAddress
import java.sql.Timestamp
import scala.io.Source

case class HttpData(value: String, timestamp: Timestamp)
object RestAPIBookingConsumer extends App {

val spark = SparkSession
.builder()
.master("local")
.appName("Rest API Source")
.getOrCreate()

import spark.implicits._

// Port and streaming trigger interval
val PORT = 9999
val INTERVAL = "20 seconds"

// Create HTTP Server and start streaming
implicit val sqlContext: SQLContext = spark.sqlContext
val queryDF = toDF(sqlContext,"/")

val schema = spark.read.json("/Users/soumav/Documents/BookingData.json").as[Booking].schema

val metricsDF = queryDF.withColumn("i",from_json(col("value"),schema)).withColumn("bookingID",col("i.bookingId"))
.withColumn("userID",col("i.userId"))
.withColumn("fromLocation",col("i.fromLocation"))
.withColumn("toLocation",col("i.toLocation"))
.withColumn("airlines",col("i.airlines"))
.withColumn("orderPlacedDt",col("i.orderPlacedDt"))
.withColumn("transactionId",col("i.paymentDetails.transactionId"))
.withColumn("paymentMethod",col("i.paymentDetails.paymentMethod"))
.withColumn("amount",col("i.paymentDetails.amount"))

val groupedDS = metricsDF.groupBy("airlines").agg(functions.sum("amount").as("TotalSales"),count("bookingId"),functions.max("amount").as("HighestFare"))

val query = groupedDS.writeStream.outputMode("complete").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(INTERVAL)).start()

// Wait for it...
query.awaitTermination()

def toDF(implicit sqlContext: SQLContext,baseUrl:String): DataFrame = {
// Create a memory Stream
implicit val enc: Encoder[HttpData] = Encoders.product[HttpData]
val stream = MemoryStream[HttpData]

// Create server
val server = HttpServer.create(new InetSocketAddress(9999), 0)
server.setExecutor(null)
server.createContext(
baseUrl,
new HttpHandler {
override def handle(httpExchange: HttpExchange): Unit = {
val payload = Source.fromInputStream(httpExchange.getRequestBody).mkString
val timestamp = new java.sql.Timestamp(System.currentTimeMillis())
val offset = stream.addData(HttpData(payload, timestamp))
val response = s"""{ "success": true, "timestamp": "$timestamp", "offset": $offset }"""
sendResponse(httpExchange, status = 200, response)
}
}
)

// Start server and return streaming DF
server.start()
stream.toDF()

}

def sendResponse(he: HttpExchange, status: Int, response: String): Unit = {
he.getResponseHeaders.set("Content-Type", "application/json");
he.sendResponseHeaders(status, response.length)
val os = he.getResponseBody
os.write(response.getBytes)
os.close()
}

}

Output

— — — — — — — — — — — — — — — — — — — — — -
Batch: 6
— — — — — — — — — — — — — — — — — — — — — -
+ — — — — -+ — — — — — — — — — + — — — — — — — — + — — — — — — — — -+
|airlines |TotalSales |count(bookingId)|HighestFare |
+ — — — — -+ — — — — — — — — — + — — — — — — — — + — — — — — — — — -+
|Air India|5129.074695530944 |1 |5129.074695530944|
|Indigo |8583.952885541752 |2 |4465.017094535654|
|GoAir |16387.541754809987|3 |5948.526468646205|
|Vistara |5153.629579980829 |1 |5153.629579980829|
|Spicejet |5617.755785487477 |1 |5617.755785487477|
+ — — — — -+ — — — — — — — — — + — — — — — — — — + — — — — — — — — -+

We can use this to setup server aggregations or real time analytics based upon the requests a server receives.

--

--

Soumav Prakash

Data Engineer 3 @Walmart . Love to play with data , and find new means of extracting data from which some meaning can be derived.