A time series is a sequence of data points ordered by the time. Time series analysis is a methodology for extracting useful and meaningful information from these data points. Scientific oriented languages such as R supports different time series forecasting models such as ARIMA, HoltWinters and ETS. In an earlier post, we desmonstrated using these models to forecast sales of SalesForce data (SFDC). A major drawback with R is that it is single-threaded, which limits its scalbility. In this post, we will exploite different languages: Python and Spark on Scala and PySpark. For bavarity, we will focus only on ARIMA model.

## Sales Forecasting using Python

Unlike R, Python does not support automatic detection of the best ARIMA model. Further, its ARIMA implementation predicts a single data point at a time. The following code handles these limitations#auto-fitting

warnings.filterwarnings("ignore") # specify to ignore warning messages

best_p, best_d, best_q = 0, 1, 0

best_aic = sys.maxint

best_history = []

for p in range(MAX_P):

for d in range(MAX_D):

for q in range(MAX_Q):

try:

model = ARIMA(np.asarray(amounts, dtype=np.float64), order=(p, d, q))

model_fit = model.fit(disp=0)

model_aic = model_fit.aic

if model_aic < best_aic:

# prediction

size = len(amounts)

history = amounts[0:size]

for t in range(0, period):

model = ARIMA(np.asarray(history, dtype=np.float64), order=(p, d, q))

model_fit = model.fit(disp=0)

aic = model_fit.aic

bic = model_fit.bic

output = model_fit.forecast()

history.append(output[0][0])

best_history = history

best_p, best_d, best_q = p, d, q

best_aic = model_aic

except:

continue

print "ARIMA(", best_p, best_d, best_q, ")", "AIC=", best_aic

return best_history[len(amounts):]

The following code is similar to the R code illustrated before, which forecast SFDC data.

from dateutil.relativedelta import relativedelta

from util import *

period = 6 # months

# retrieve data

sparkSession = getSparkSession()

data = loadData(sparkSession)

amounts = data.rdd.map(lambda row: row.Amount).collect()

series = data.rdd.map(lambda row: row.CloseDate).collect()

# train and check accuracy

trainingSize = int(0.75 * len(amounts))

checkingSize = len(amounts) - trainingSize

trainingData = amounts[0:trainingSize]

checkingData = amounts[trainingSize:]

checkingPredicted = predictor(trainingData, checkingSize)

squareError = 0

for i in range(0, checkingSize):

squareError += (checkingPredicted[i] - checkingData[i])**2

print int(checkingPredicted[i]), "should be", checkingData[i]

mse = squareError**(1/2.0)

print "Root Square Error", int(mse)

# prediction

predicted = predictor(amounts, period)

month = datetime.strptime(series[0], '%Y-%m')

d = []

for i in amounts:

d.append((month, i))

month += relativedelta(months=1)

for i in predicted:

d.append((month, long(i.item())))

month += relativedelta(months=1)

df = sparkSession.createDataFrame(d, ['date', 'amount'])

saveData(df)

Oct 2016 1095653

Nov 2016 1268538

Dec 2016 1416972

Jan 2017 1301205

Feb 2017 1334660

## Sales Forecasting using Spark

Both R and Python is single threaded, which is not suitable for processing (or loading) large data. Previously, we count on the DB to retrieve, aggregate and sort the data. Here, we will get use of the power of Spark to handle this. SparkSQL introduces SQL interface for converting SQL queries into Spark jobs. Additionally, a third party implementation for ARIMA is available at https://github.com/sryza/spark-timeseries/. In this and next section we will use Spark to model and forecast the data.import org.apache.spark.mllib.linalg.DenseVector

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.{Row, SparkSession}

object CloseWin {

def forecast(): Unit ={

val APP_NAME = "Sales Forecast"

val period = 6

val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[2]")

val sc = new SparkContext(conf)

val spark = SparkSession

.builder()

.appName(APP_NAME)

.getOrCreate()

var dfr = spark.read

.format("jdbc")

.option("url", "jdbc:mysql://my-db-domain:3306/sfdc")

.option("user", "my-db-username")

.option("password", "my-db-password")

val df2 = dfr.option("dbtable","(SELECT CloseDate, Amount FROM toast.opportunity WHERE IsWon='true' AND IsClosed='true') as win").load()

df2.createOrReplaceTempView("opp")

val df = df2.sqlContext.sql("SELECT DATE_FORMAT(CloseDate,'yyyy-MM') as CloseDate, SUM(Amount) as Amount FROM opp GROUP BY DATE_FORMAT(CloseDate,'yyyy-MM') ORDER BY CloseDate")

val monthes = df.collect().flatMap((row: Row) => Array(row.get(0)))

val amounts = df.collect().flatMap((row: Row) => Array(row.getLong(1).intValue().toDouble))

{

// Training

val trainingSize = (amounts.length * 0.75).toInt

val trainingAmounts = new Array[Double](trainingSize)

for(i <- 0 until trainingSize){

trainingAmounts(i) = amounts(i)

}

val actual = new DenseVector(trainingAmounts)

val period = amounts.length - trainingSize

val model = ARIMA.autoFit(actual)

println("best-fit model ARIMA(" + model.p + "," + model.d + "," + model.q + ") AIC=" + model.approxAIC(actual) )

val predicted = model.forecast(actual, period)

var totalErrorSquare = 0.0

for (i <- (predicted.size - period) until predicted.size) {

val errorSquare = Math.pow(predicted(i) - amounts(i), 2)

println(monthes(i) + ":\t\t" + predicted(i) + "\t should be \t" + amounts(i) + "\t Error Square = " + errorSquare)

totalErrorSquare += errorSquare

}

println("Root Mean Square Error: " + Math.sqrt(totalErrorSquare/period))

}

{

// Prediction

val actual = new DenseVector(amounts)

val model = ARIMA.autoFit(actual)

println("best-fit model ARIMA(" + model.p + "," + model.d + "," + model.q + ") AIC=" + model.approxAIC(actual) )

val predicted = model.forecast(actual, period)

for (i <- 0 until predicted.size) {

println("Model Point #" + i + "\t:\t" + predicted(i))

}

}

}

}

## Sales Forecasting using PySpark

PySpark is a Python interface for Spark. We can use the Python implementation explained before but we will change the following:The util package to get use of Spark

return pyspark.sql.SparkSession.builder.getOrCreate()

def loadData(sparkSession):

sparkSQL = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://my-db-domain:3306/sfdc").option("dbtable", "sfdc.opportunity").option("user", "my-db-username").option("password", "my-db-password").load()

sparkSQL.createOrReplaceTempView("opp")

return sparkSQL.sql_ctx.sql("SELECT DATE_FORMAT(CloseDate,'yyyy-MM') as CloseDate, SUM(Amount) as Amount FROM opp WHERE IsWon='true' AND IsClosed='true' GROUP BY DATE_FORMAT(CloseDate,'yyyy-MM') ORDER BY CloseDate")

def saveData(result):

result.show()

The prediction method to use Spark ARIMA instead the default python implementation

def predict_ARIMA_Spark(amounts, period):

spark_context = pyspark.SparkContext.getOrCreate()

model = spark_context._jvm.com.cloudera.sparkts.models.ARIMA.autoFit(_py2java(spark_context, Vectors.dense(amounts)), MAX_P, MAX_D, MAX_Q)

p = _java2py(spark_context, model.p())

d = _java2py(spark_context, model.d())

q = _java2py(spark_context, model.q())

jts = _py2java(spark_context, Vectors.dense(amounts))

aic = model.approxAIC(jts)

print "ARIMA(", p, d, q, ")", "AIC=", aic

jfore = model.forecast(jts, period)

return _java2py(spark_context, jfore)[len(amounts):]