Time Series Analysis using Spark

June 29th 2017, 10:30 amCategory: Big Data 0 comments

A time series is a sequence of data points ordered by the time. Time series analysis is a methodology for extracting useful and meaningful information from these data points. Scientific oriented languages such as R supports different time series forecasting models such as ARIMA, HoltWinters and ETS. In an earlier post, we desmonstrated using these models to forecast sales of SalesForce data (SFDC). A major drawback with R is that it is single-threaded, which limits its scalbility. In this post, we will exploite different languages: Python and Spark on Scala and PySpark. For bavarity, we will focus only on ARIMA model.

Sales Forecasting using Python

Unlike R, Python does not support automatic detection of the best ARIMA model. Further, its ARIMA implementation predicts a single data point at a time. The following code handles these limitations
def predict_ARIMA_AUTO(amounts, period):
    warnings.filterwarnings("ignore")  # specify to ignore warning messages
    best_p, best_d, best_q = 0, 1, 0
    best_aic = sys.maxint
    best_history = []
    for p in range(MAX_P):
        for d in range(MAX_D):
            for q in range(MAX_Q):
                    model = ARIMA(np.asarray(amounts, dtype=np.float64), order=(p, d, q))
                    model_fit = model.fit(disp=0)
                    model_aic = model_fit.aic
                    if model_aic < best_aic:
                        # prediction
                        size = len(amounts)
                        history = amounts[0:size]
                        for t in range(0, period):
                            model = ARIMA(np.asarray(history, dtype=np.float64), order=(p, d, q))
                            model_fit = model.fit(disp=0)
                            aic = model_fit.aic
                            bic = model_fit.bic
                            output = model_fit.forecast()
                        best_history = history
                        best_p, best_d, best_q = p, d, q
                        best_aic = model_aic
    print "ARIMA(", best_p, best_d, best_q, ")", "AIC=", best_aic
    return best_history[len(amounts):]
The code tries out different combinations of ARIMA parameters (p, d & q) at lines 7-9, and pick the best mode. The best ARIMA model for given data-set is the one with the lowest AIC parameter. In order to resolve the single point prediction, we append the predicted point to the given data-set, and re-predict again. This incremental prediction allowed us to predict N point instead the default Python behavior.

The following code is similar to the R code illustrated before, which forecast SFDC data. 
from datetime import datetime
from dateutil.relativedelta import relativedelta
from util import *
period = 6                  # months
# retrieve data
sparkSession = getSparkSession()
data = loadData(sparkSession)
amounts = data.rdd.map(lambda row: row.Amount).collect()
series = data.rdd.map(lambda row: row.CloseDate).collect()
# train and check accuracy
trainingSize = int(0.75 * len(amounts))
checkingSize = len(amounts) - trainingSize
trainingData = amounts[0:trainingSize]
checkingData = amounts[trainingSize:]
checkingPredicted = predictor(trainingData, checkingSize)
squareError = 0
for i in range(0, checkingSize):
    squareError += (checkingPredicted[i] - checkingData[i])**2
    print int(checkingPredicted[i]), "should be", checkingData[i]
mse = squareError**(1/2.0)
print "Root Square Error", int(mse)
# prediction
predicted = predictor(amounts, period)
month = datetime.strptime(series[0], '%Y-%m')
d = []
for i in amounts:
    d.append((month, i))
    month += relativedelta(months=1)
for i in predicted:
    d.append((month, long(i.item())))
    month += relativedelta(months=1)
df = sparkSession.createDataFrame(d, ['date', 'amount'])
The code is self-explanatory and gives the same logic described before. Here we encapsulated the loading of data from database and saving the results in a custom util package. The training detects ARIMA model with the following parameters, and root mean square error = 1500959
ARIMA( 0 1 0 ) AIC= 687.049546173
The final model using the full data-set is with the following parameters
ARIMA( 2 1 0 ) AIC= 967.844839706
and it gives the following predictions
Sep 2016  1361360
Oct 2016  1095653
Nov 2016  1268538
Dec 2016  1416972
Jan 2017  1301205
Feb 2017  1334660

Sales Forecasting using Spark

Both R and Python is single threaded, which is not suitable for processing (or loading) large data. Previously, we count on the DB to retrieve, aggregate and sort the data. Here, we will get use of the power of Spark to handle this. SparkSQL introduces SQL interface for converting SQL queries into Spark jobs. Additionally, a third party implementation for ARIMA is available at https://github.com/sryza/spark-timeseries/. In this and next section we will use Spark to model and forecast the data.
import com.cloudera.sparkts.models.ARIMA
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
object CloseWin {
  def forecast(): Unit ={
    val APP_NAME = "Sales Forecast"
    val period = 6
    val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[2]")
    val sc = new SparkContext(conf)
    val spark = SparkSession
    var dfr = spark.read
      .option("url", "jdbc:mysql://my-db-domain:3306/sfdc")
      .option("user", "my-db-username")
      .option("password", "my-db-password")
    val df2 = dfr.option("dbtable","(SELECT CloseDate, Amount FROM toast.opportunity WHERE IsWon='true' AND IsClosed='true') as win").load()
    val df = df2.sqlContext.sql("SELECT DATE_FORMAT(CloseDate,'yyyy-MM') as CloseDate, SUM(Amount) as Amount FROM opp GROUP BY DATE_FORMAT(CloseDate,'yyyy-MM') ORDER BY CloseDate")
    val monthes = df.collect().flatMap((row: Row) => Array(row.get(0)))
    val amounts = df.collect().flatMap((row: Row) => Array(row.getLong(1).intValue().toDouble))
      // Training
      val trainingSize = (amounts.length * 0.75).toInt
      val trainingAmounts = new Array[Double](trainingSize)
      for(i <- 0 until trainingSize){
        trainingAmounts(i) = amounts(i)
      val actual = new DenseVector(trainingAmounts)
      val period = amounts.length - trainingSize
      val model = ARIMA.autoFit(actual)
      println("best-fit model ARIMA(" + model.p + "," + model.d + "," + model.q + ") AIC=" + model.approxAIC(actual) )
      val predicted = model.forecast(actual, period)
      var totalErrorSquare = 0.0
      for (i <- (predicted.size - period) until predicted.size) {
        val errorSquare = Math.pow(predicted(i) - amounts(i), 2)
        println(monthes(i) + ":\t\t" + predicted(i) + "\t should be \t" + amounts(i) + "\t Error Square = " + errorSquare)
        totalErrorSquare += errorSquare
      println("Root Mean Square Error: " + Math.sqrt(totalErrorSquare/period))
      // Prediction
      val actual = new DenseVector(amounts)
      val model = ARIMA.autoFit(actual)
      println("best-fit model ARIMA(" + model.p + "," + model.d + "," + model.q + ")  AIC=" + model.approxAIC(actual)  )
      val predicted = model.forecast(actual, period)
      for (i <- 0 until predicted.size) {
        println("Model Point #" + i + "\t:\t" + predicted(i))

Sales Forecasting using PySpark

PySpark is a Python interface for Spark. We can use the Python implementation explained before but we will change the following:

The util package to get use of Spark
def getSparkSession():
    return pyspark.sql.SparkSession.builder.getOrCreate()
def loadData(sparkSession):
    sparkSQL = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://my-db-domain:3306/sfdc").option("dbtable", "sfdc.opportunity").option("user", "my-db-username").option("password", "my-db-password").load()
    return sparkSQL.sql_ctx.sql("SELECT DATE_FORMAT(CloseDate,'yyyy-MM') as CloseDate, SUM(Amount) as Amount FROM opp WHERE IsWon='true' AND IsClosed='true' GROUP BY DATE_FORMAT(CloseDate,'yyyy-MM') ORDER BY CloseDate")
def saveData(result):
The prediction method to use Spark ARIMA instead the default python implementation

def predict_ARIMA_Spark(amounts, period):
    spark_context = pyspark.SparkContext.getOrCreate()
    model = spark_context._jvm.com.cloudera.sparkts.models.ARIMA.autoFit(_py2java(spark_context, Vectors.dense(amounts)), MAX_P, MAX_D, MAX_Q)
    p = _java2py(spark_context, model.p())
    d = _java2py(spark_context, model.d())
    q = _java2py(spark_context, model.q())
    jts = _py2java(spark_context, Vectors.dense(amounts))
    aic = model.approxAIC(jts)
    print "ARIMA(", p, d, q, ")", "AIC=", aic
    jfore = model.forecast(jts, period)
    return _java2py(spark_context, jfore)[len(amounts):]
Unfortunately, the third party implementation for Spark on Python is not native. It just delegates the calls to Java and exploits Py4J to wire Python with Java.

Sales Forecasting using R

June 29th 2017, 9:53 amCategory: Big Data 0 comments

Sales forecasting is the process of estimating future sales and revenue in order to enable companies to make informed business decisions and predict short-term and long-term performance. Companies can base their forecasts on past sales data, industry-wide comparisons, and economic trends. The problem of sales forecasting can be classified as a time-series forecasting, because the time is the domain in which the data (sales or revenue) got changed.

Time Series Analysis

A time series is a sequence of data points ordered by the time. Time series analysis is a methodology for extracting useful and meaningful information from these data points. Any time series can be decomposed into three components:
  • Trend: it means the regression of the data points with time. For example, a time series with a positive trend means that the values of the data points at (t+n) is larger than the ones at time (t). Here the value of the data dependes on the Time rather than the previous values.
  • Seasonality (Cycle): it means the repetition of the data over the time domain. In other words, the data values at time (t+n) is the same as the data at time (t), where n is the seasonality or cycle length
  • Noise (Random Walk): this is a time independent component (non-systematic) that is added (or subtracted) to the data points.

Based on this we can classify the time series into two classes:
  • Non-Stationary: data points with means or variance and covariance that change over the time. This is interpreted as trends or cycles or combination of them. 
  • Stationary: data points that its means and variance and covariance does not change over the time

The theories behind non-stationary signals and forecasting is not mature and modeling it is complex, which leads to inaccurate results. Luckily, non-stationary series can be transformed into stationary using common techniques (e.g. differentiation). The idea of differentiation is to subtract the data value from its predecessor, so the new series will lose a component of the time-dep

Sales-Force Forecasting

Salesforce.com (abbreviated as SF or SFDC) is a could computing company that purchase customer relationship management (CRM) products. Salesforce.com's CRM service is broken down into several broad categories: Sales Cloud, Service Cloud, Data Cloud, Marketing Cloud, Community Cloud, Analytics Cloud, App Cloud, and IoT with over 100,000 customers.
In this post we will analyze and forecast a sample sales data from salesforce CRM that shows the sales grows between 2013 and 2016, and we will predict the sales values for two business quarters. We will use two models for forecasting: ARIMA and HoltWinters, and will demonstrate how to do that using R language.
The methodology that we will follow is:
  1. Aggregate the data per month
  2. Construct the model using 75% of the data as training set
  3. Check the model accuracy using 25% of the data, and calculate the root mean square error
  4. Forecast the data for the next two quarters
We assume the data is stored at "opportunity" table, and we are interested in the following fields:
  • CloseDate: date of closing the oppertunity (Measure field)
  • Amount: the monotary amount optained
  • IsWon and IsClosed: flags for if the opportunity win/lost and closed/opened
The following plots shows the Amount vs the CloseDate, and the decomposition of the time series into trend, seasonality and residuals. 

Sales Forecasting using R

We need to install two packages: RJDBC for connecting to DB and retrieve the data, and forecast for data modeling, analysis and forecasting. The following R script shows the sales forecasting using ARMIA.
ARIMA model is an abbreviation for Autoregressive Integrated Moving Average, so it is a combination of multiple techniques:
  • Auto-regression (AR)
  • Integration (I)
  • Moving Average (MA)

rmse <- function(sim, obs){
  return(sqrt(mean((sim - obs)^2, na.rm = TRUE)))

construct_model <- function(data){
  data.start = strsplit(data$CloseDate[1], "-")
  data.end = strsplit(data$CloseDate[nrow(data)], "-")
  data.ts = ts(data$Amount, 
                frequency = 12)
  model = auto.arima(data.ts)

get_forecast_model <- function(close.win.opp){
  # Train with 75% of data   
  N = ceiling(0.75*nrow(close.win.opp))
  train.data = close.win.opp[1:N,]
  model = construct_model(train.data)

  # Test with 25% of data   
  test.data = close.win.opp[(N+1):nrow(close.win.opp),]
  predicted = forecast(model, length(test.data$Amount))
  cat("RMSE=", rmse(predicted$mean, test.data$Amount), "\n")
  # Train with all data 
  model = construct_model(close.win.opp)

drv <- JDBC("com.mysql.jdbc.Driver",  classPath="./mysql-connector-java-5.1.41-bin.jar")
conn <- dbConnect(drv, "jdbc:mysql://my-db-domain:3306/sfdc", "my-db-username", "my-db-password")

close.win.opp = dbGetQuery(conn, "SELECT DATE_FORMAT(CloseDate,'%Y-%m') as CloseDate, SUM(Amount) as Amount FROM opportunity WHERE IsWon='true' AND IsClosed='true' And CloseDate < '2016-09-01' GROUP BY DATE_FORMAT(CloseDate,'%Y-%m') ORDER BY CloseDate")

model = get_forecast_model(close.win.opp)
predicted = forecast(model, 6)

The script splits the data-set into training data (75%) and verification data (25%). Next, it build the model based on the training data. R has an implementation for ARIMA model featured with automatic detection of parameters. For the before mentioned SFDC dataset, we obtained the following model, with root mean square error = 233560
sigma^2 estimated as 4.254e+10:  log likelihood=-218.49
AIC=438.98   AICc=439.27   BIC=439.76
Next, we build a model using all the data-set, the obtained model is
sigma^2 estimated as 5.288e+10:  log likelihood=-343.82
AIC=691.64   AICc=692.18   BIC=694.07
Finally, we forecast the next 6 months using this model at Line 45. The data forecasting is
         Point      Forecast   Lo 80   Hi 80     Lo 95   Hi 95
Sep 2016        1894667 1599973 2189362 1443970.6 2345364
Oct 2016        1520401 1201883 1838919 1033270.3 2007532
Nov 2016        1545809 1205130 1886488 1024785.6 2066833
Dec 2016        1477773 1116289 1839257  924930.8 2030616
Jan 2017        1517143 1135988 1898299  934216.2 2100070
Feb 2017        1825764 1425904 2225624 1214230.9 2437297

R supports different time series forecasting models. In the code above, you can easily change the forecasting model by changing Line 18. Fore example to use HoltWinters model change the code to
model = HoltWinters(data.ts)
The root mean square error was 301992.9, and the predictions were in this case
         Point      Forecast   Lo 80   Hi 80     Lo 95   Hi 95
Sep 2016        1760990 1507244 2014735 1372919.6 2149059
Oct 2016        1363874 1107298 1620450  971474.8 1756273
Nov 2016        1381785 1120706 1642865  982498.9 1781072
Dec 2016        1321675 1054109 1589240  912468.3 1730881
Jan 2017        1376777 1100499 1653055  954246.8 1799307
Feb 2017        1687538 1400158 1974919 1248028.3 2127049

As we see, using HoltWinters model for our data is more appropriate than ARMIA (less mean square error)