Spark HTTP Server Requests Aggregations
In this article , i will be covering two key concepts
- Creating your own rest API in scala within 5 minutes
- Building the aggregations on the incoming HTTP Requests
To create a rest API , we can use the HttpServer Library which enables us to create a server off a given port and a Base URL . We can also define a handler to enable the API to send a response to the client.
On top of that , we will be enabling a Memory Stream based Spark Streaming application that will be aggregating the metrics on the go.
While the application is on , for every 20 seconds of batch interval the application will calculate the aggregations based upon the data available . I have gone with the Complete Output mode to stream the results to the console.
Sample Postman Request
Please find the source code below
package Streaming
import Streaming.BookingDataConsumer.spark
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SQLContext, SparkSession, functions}
import org.apache.spark.sql.streaming.Trigger
import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, count, from_json}
import java.net.InetSocketAddress
import java.sql.Timestamp
import scala.io.Source
case class HttpData(value: String, timestamp: Timestamp)
object RestAPIBookingConsumer extends App {
val spark = SparkSession
.builder()
.master("local")
.appName("Rest API Source")
.getOrCreate()
import spark.implicits._
// Port and streaming trigger interval
val PORT = 9999
val INTERVAL = "20 seconds"
// Create HTTP Server and start streaming
implicit val sqlContext: SQLContext = spark.sqlContext
val queryDF = toDF(sqlContext,"/")
val schema = spark.read.json("/Users/soumav/Documents/BookingData.json").as[Booking].schema
val metricsDF = queryDF.withColumn("i",from_json(col("value"),schema)).withColumn("bookingID",col("i.bookingId"))
.withColumn("userID",col("i.userId"))
.withColumn("fromLocation",col("i.fromLocation"))
.withColumn("toLocation",col("i.toLocation"))
.withColumn("airlines",col("i.airlines"))
.withColumn("orderPlacedDt",col("i.orderPlacedDt"))
.withColumn("transactionId",col("i.paymentDetails.transactionId"))
.withColumn("paymentMethod",col("i.paymentDetails.paymentMethod"))
.withColumn("amount",col("i.paymentDetails.amount"))
val groupedDS = metricsDF.groupBy("airlines").agg(functions.sum("amount").as("TotalSales"),count("bookingId"),functions.max("amount").as("HighestFare"))
val query = groupedDS.writeStream.outputMode("complete").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(INTERVAL)).start()
// Wait for it...
query.awaitTermination()
def toDF(implicit sqlContext: SQLContext,baseUrl:String): DataFrame = {
// Create a memory Stream
implicit val enc: Encoder[HttpData] = Encoders.product[HttpData]
val stream = MemoryStream[HttpData]
// Create server
val server = HttpServer.create(new InetSocketAddress(9999), 0)
server.setExecutor(null)
server.createContext(
baseUrl,
new HttpHandler {
override def handle(httpExchange: HttpExchange): Unit = {
val payload = Source.fromInputStream(httpExchange.getRequestBody).mkString
val timestamp = new java.sql.Timestamp(System.currentTimeMillis())
val offset = stream.addData(HttpData(payload, timestamp))
val response = s"""{ "success": true, "timestamp": "$timestamp", "offset": $offset }"""
sendResponse(httpExchange, status = 200, response)
}
}
)
// Start server and return streaming DF
server.start()
stream.toDF()
}
def sendResponse(he: HttpExchange, status: Int, response: String): Unit = {
he.getResponseHeaders.set("Content-Type", "application/json");
he.sendResponseHeaders(status, response.length)
val os = he.getResponseBody
os.write(response.getBytes)
os.close()
}
}
Output
— — — — — — — — — — — — — — — — — — — — — -
Batch: 6
— — — — — — — — — — — — — — — — — — — — — -
+ — — — — -+ — — — — — — — — — + — — — — — — — — + — — — — — — — — -+
|airlines |TotalSales |count(bookingId)|HighestFare |
+ — — — — -+ — — — — — — — — — + — — — — — — — — + — — — — — — — — -+
|Air India|5129.074695530944 |1 |5129.074695530944|
|Indigo |8583.952885541752 |2 |4465.017094535654|
|GoAir |16387.541754809987|3 |5948.526468646205|
|Vistara |5153.629579980829 |1 |5153.629579980829|
|Spicejet |5617.755785487477 |1 |5617.755785487477|
+ — — — — -+ — — — — — — — — — + — — — — — — — — + — — — — — — — — -+
We can use this to setup server aggregations or real time analytics based upon the requests a server receives.