A time series is a sequence of data points ordered by the time. Time series analysis is a methodology for extracting useful and meaningful information from these data points. Scientific oriented languages such as R supports different time series forecasting models such as ARIMA, HoltWinters and ETS. In an earlier post, we desmonstrated using these models to forecast sales of SalesForce data (SFDC). A major drawback with R is that it is single-threaded, which limits its scalbility. In this post, we will exploite different languages: Python and Spark on Scala and PySpark. For bavarity, we will focus only on ARIMA model.
Sales Forecasting using Python
Unlike R, Python does not support automatic detection of the best ARIMA model. Further, its ARIMA implementation predicts a single data point at a time. The following code handles these limitationsdef predict_ARIMA_AUTO(amounts, period):
#auto-fitting
warnings.filterwarnings("ignore") # specify to ignore warning messages
best_p, best_d, best_q = 0, 1, 0
best_aic = sys.maxint
best_history = []
for p in range(MAX_P):
for d in range(MAX_D):
for q in range(MAX_Q):
try:
model = ARIMA(np.asarray(amounts, dtype=np.float64), order=(p, d, q))
model_fit = model.fit(disp=0)
model_aic = model_fit.aic
if model_aic < best_aic:
# prediction
size = len(amounts)
history = amounts[0:size]
for t in range(0, period):
model = ARIMA(np.asarray(history, dtype=np.float64), order=(p, d, q))
model_fit = model.fit(disp=0)
aic = model_fit.aic
bic = model_fit.bic
output = model_fit.forecast()
history.append(output[0][0])
best_history = history
best_p, best_d, best_q = p, d, q
best_aic = model_aic
except:
continue
print "ARIMA(", best_p, best_d, best_q, ")", "AIC=", best_aic
return best_history[len(amounts):]
The code tries out different combinations of ARIMA parameters (p, d & q) at lines 7-9, and pick the best mode. The best ARIMA model for given data-set is the one with the lowest AIC parameter. In order to resolve the single point prediction, we append the predicted point to the given data-set, and re-predict again. This incremental prediction allowed us to predict N point instead the default Python behavior.#auto-fitting
warnings.filterwarnings("ignore") # specify to ignore warning messages
best_p, best_d, best_q = 0, 1, 0
best_aic = sys.maxint
best_history = []
for p in range(MAX_P):
for d in range(MAX_D):
for q in range(MAX_Q):
try:
model = ARIMA(np.asarray(amounts, dtype=np.float64), order=(p, d, q))
model_fit = model.fit(disp=0)
model_aic = model_fit.aic
if model_aic < best_aic:
# prediction
size = len(amounts)
history = amounts[0:size]
for t in range(0, period):
model = ARIMA(np.asarray(history, dtype=np.float64), order=(p, d, q))
model_fit = model.fit(disp=0)
aic = model_fit.aic
bic = model_fit.bic
output = model_fit.forecast()
history.append(output[0][0])
best_history = history
best_p, best_d, best_q = p, d, q
best_aic = model_aic
except:
continue
print "ARIMA(", best_p, best_d, best_q, ")", "AIC=", best_aic
return best_history[len(amounts):]
The following code is similar to the R code illustrated before, which forecast SFDC data.
from datetime import datetime
from dateutil.relativedelta import relativedelta
from util import *
period = 6 # months
# retrieve data
sparkSession = getSparkSession()
data = loadData(sparkSession)
amounts = data.rdd.map(lambda row: row.Amount).collect()
series = data.rdd.map(lambda row: row.CloseDate).collect()
# train and check accuracy
trainingSize = int(0.75 * len(amounts))
checkingSize = len(amounts) - trainingSize
trainingData = amounts[0:trainingSize]
checkingData = amounts[trainingSize:]
checkingPredicted = predictor(trainingData, checkingSize)
squareError = 0
for i in range(0, checkingSize):
squareError += (checkingPredicted[i] - checkingData[i])**2
print int(checkingPredicted[i]), "should be", checkingData[i]
mse = squareError**(1/2.0)
print "Root Square Error", int(mse)
# prediction
predicted = predictor(amounts, period)
month = datetime.strptime(series[0], '%Y-%m')
d = []
for i in amounts:
d.append((month, i))
month += relativedelta(months=1)
for i in predicted:
d.append((month, long(i.item())))
month += relativedelta(months=1)
df = sparkSession.createDataFrame(d, ['date', 'amount'])
saveData(df)
The code is self-explanatory and gives the same logic described before. Here we encapsulated the loading of data from database and saving the results in a custom util package. The training detects ARIMA model with the following parameters, and root mean square error = 1500959
from dateutil.relativedelta import relativedelta
from util import *
period = 6 # months
# retrieve data
sparkSession = getSparkSession()
data = loadData(sparkSession)
amounts = data.rdd.map(lambda row: row.Amount).collect()
series = data.rdd.map(lambda row: row.CloseDate).collect()
# train and check accuracy
trainingSize = int(0.75 * len(amounts))
checkingSize = len(amounts) - trainingSize
trainingData = amounts[0:trainingSize]
checkingData = amounts[trainingSize:]
checkingPredicted = predictor(trainingData, checkingSize)
squareError = 0
for i in range(0, checkingSize):
squareError += (checkingPredicted[i] - checkingData[i])**2
print int(checkingPredicted[i]), "should be", checkingData[i]
mse = squareError**(1/2.0)
print "Root Square Error", int(mse)
# prediction
predicted = predictor(amounts, period)
month = datetime.strptime(series[0], '%Y-%m')
d = []
for i in amounts:
d.append((month, i))
month += relativedelta(months=1)
for i in predicted:
d.append((month, long(i.item())))
month += relativedelta(months=1)
df = sparkSession.createDataFrame(d, ['date', 'amount'])
saveData(df)
ARIMA( 0 1 0 ) AIC= 687.049546173
The final model using the full data-set is with the following parameters
ARIMA( 2 1 0 ) AIC= 967.844839706
and it gives the following predictions
Sep 2016 1361360
Oct 2016 1095653
Nov 2016 1268538
Dec 2016 1416972
Jan 2017 1301205
Feb 2017 1334660
Oct 2016 1095653
Nov 2016 1268538
Dec 2016 1416972
Jan 2017 1301205
Feb 2017 1334660
Sales Forecasting using Spark
Both R and Python is single threaded, which is not suitable for processing (or loading) large data. Previously, we count on the DB to retrieve, aggregate and sort the data. Here, we will get use of the power of Spark to handle this. SparkSQL introduces SQL interface for converting SQL queries into Spark jobs. Additionally, a third party implementation for ARIMA is available at https://github.com/sryza/spark-timeseries/. In this and next section we will use Spark to model and forecast the data.import com.cloudera.sparkts.models.ARIMA
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
object CloseWin {
def forecast(): Unit ={
val APP_NAME = "Sales Forecast"
val period = 6
val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[2]")
val sc = new SparkContext(conf)
val spark = SparkSession
.builder()
.appName(APP_NAME)
.getOrCreate()
var dfr = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://my-db-domain:3306/sfdc")
.option("user", "my-db-username")
.option("password", "my-db-password")
val df2 = dfr.option("dbtable","(SELECT CloseDate, Amount FROM toast.opportunity WHERE IsWon='true' AND IsClosed='true') as win").load()
df2.createOrReplaceTempView("opp")
val df = df2.sqlContext.sql("SELECT DATE_FORMAT(CloseDate,'yyyy-MM') as CloseDate, SUM(Amount) as Amount FROM opp GROUP BY DATE_FORMAT(CloseDate,'yyyy-MM') ORDER BY CloseDate")
val monthes = df.collect().flatMap((row: Row) => Array(row.get(0)))
val amounts = df.collect().flatMap((row: Row) => Array(row.getLong(1).intValue().toDouble))
{
// Training
val trainingSize = (amounts.length * 0.75).toInt
val trainingAmounts = new Array[Double](trainingSize)
for(i <- 0 until trainingSize){
trainingAmounts(i) = amounts(i)
}
val actual = new DenseVector(trainingAmounts)
val period = amounts.length - trainingSize
val model = ARIMA.autoFit(actual)
println("best-fit model ARIMA(" + model.p + "," + model.d + "," + model.q + ") AIC=" + model.approxAIC(actual) )
val predicted = model.forecast(actual, period)
var totalErrorSquare = 0.0
for (i <- (predicted.size - period) until predicted.size) {
val errorSquare = Math.pow(predicted(i) - amounts(i), 2)
println(monthes(i) + ":\t\t" + predicted(i) + "\t should be \t" + amounts(i) + "\t Error Square = " + errorSquare)
totalErrorSquare += errorSquare
}
println("Root Mean Square Error: " + Math.sqrt(totalErrorSquare/period))
}
{
// Prediction
val actual = new DenseVector(amounts)
val model = ARIMA.autoFit(actual)
println("best-fit model ARIMA(" + model.p + "," + model.d + "," + model.q + ") AIC=" + model.approxAIC(actual) )
val predicted = model.forecast(actual, period)
for (i <- 0 until predicted.size) {
println("Model Point #" + i + "\t:\t" + predicted(i))
}
}
}
}
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
object CloseWin {
def forecast(): Unit ={
val APP_NAME = "Sales Forecast"
val period = 6
val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[2]")
val sc = new SparkContext(conf)
val spark = SparkSession
.builder()
.appName(APP_NAME)
.getOrCreate()
var dfr = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://my-db-domain:3306/sfdc")
.option("user", "my-db-username")
.option("password", "my-db-password")
val df2 = dfr.option("dbtable","(SELECT CloseDate, Amount FROM toast.opportunity WHERE IsWon='true' AND IsClosed='true') as win").load()
df2.createOrReplaceTempView("opp")
val df = df2.sqlContext.sql("SELECT DATE_FORMAT(CloseDate,'yyyy-MM') as CloseDate, SUM(Amount) as Amount FROM opp GROUP BY DATE_FORMAT(CloseDate,'yyyy-MM') ORDER BY CloseDate")
val monthes = df.collect().flatMap((row: Row) => Array(row.get(0)))
val amounts = df.collect().flatMap((row: Row) => Array(row.getLong(1).intValue().toDouble))
{
// Training
val trainingSize = (amounts.length * 0.75).toInt
val trainingAmounts = new Array[Double](trainingSize)
for(i <- 0 until trainingSize){
trainingAmounts(i) = amounts(i)
}
val actual = new DenseVector(trainingAmounts)
val period = amounts.length - trainingSize
val model = ARIMA.autoFit(actual)
println("best-fit model ARIMA(" + model.p + "," + model.d + "," + model.q + ") AIC=" + model.approxAIC(actual) )
val predicted = model.forecast(actual, period)
var totalErrorSquare = 0.0
for (i <- (predicted.size - period) until predicted.size) {
val errorSquare = Math.pow(predicted(i) - amounts(i), 2)
println(monthes(i) + ":\t\t" + predicted(i) + "\t should be \t" + amounts(i) + "\t Error Square = " + errorSquare)
totalErrorSquare += errorSquare
}
println("Root Mean Square Error: " + Math.sqrt(totalErrorSquare/period))
}
{
// Prediction
val actual = new DenseVector(amounts)
val model = ARIMA.autoFit(actual)
println("best-fit model ARIMA(" + model.p + "," + model.d + "," + model.q + ") AIC=" + model.approxAIC(actual) )
val predicted = model.forecast(actual, period)
for (i <- 0 until predicted.size) {
println("Model Point #" + i + "\t:\t" + predicted(i))
}
}
}
}
Sales Forecasting using PySpark
PySpark is a Python interface for Spark. We can use the Python implementation explained before but we will change the following:The util package to get use of Spark
def getSparkSession():
return pyspark.sql.SparkSession.builder.getOrCreate()
def loadData(sparkSession):
sparkSQL = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://my-db-domain:3306/sfdc").option("dbtable", "sfdc.opportunity").option("user", "my-db-username").option("password", "my-db-password").load()
sparkSQL.createOrReplaceTempView("opp")
return sparkSQL.sql_ctx.sql("SELECT DATE_FORMAT(CloseDate,'yyyy-MM') as CloseDate, SUM(Amount) as Amount FROM opp WHERE IsWon='true' AND IsClosed='true' GROUP BY DATE_FORMAT(CloseDate,'yyyy-MM') ORDER BY CloseDate")
def saveData(result):
result.show()
return pyspark.sql.SparkSession.builder.getOrCreate()
def loadData(sparkSession):
sparkSQL = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://my-db-domain:3306/sfdc").option("dbtable", "sfdc.opportunity").option("user", "my-db-username").option("password", "my-db-password").load()
sparkSQL.createOrReplaceTempView("opp")
return sparkSQL.sql_ctx.sql("SELECT DATE_FORMAT(CloseDate,'yyyy-MM') as CloseDate, SUM(Amount) as Amount FROM opp WHERE IsWon='true' AND IsClosed='true' GROUP BY DATE_FORMAT(CloseDate,'yyyy-MM') ORDER BY CloseDate")
def saveData(result):
result.show()
The prediction method to use Spark ARIMA instead the default python implementation
def predict_ARIMA_Spark(amounts, period):
spark_context = pyspark.SparkContext.getOrCreate()
model = spark_context._jvm.com.cloudera.sparkts.models.ARIMA.autoFit(_py2java(spark_context, Vectors.dense(amounts)), MAX_P, MAX_D, MAX_Q)
p = _java2py(spark_context, model.p())
d = _java2py(spark_context, model.d())
q = _java2py(spark_context, model.q())
jts = _py2java(spark_context, Vectors.dense(amounts))
aic = model.approxAIC(jts)
print "ARIMA(", p, d, q, ")", "AIC=", aic
jfore = model.forecast(jts, period)
return _java2py(spark_context, jfore)[len(amounts):]