17、使用SparkTs进行时序预测
之前也做过时序预测的业务,只不过使用的是pyspark+fbprophet(下次记录一下pyspark+fbprophet的使用笔记),这次使用sparkts里的holtWinters模型批量对多个商户的营业额进行预测。
之前也做过时序预测的业务,只不过使用的是pyspark+fbprophet(下次记录一下pyspark+fbprophet的使用笔记),这次使用sparkts里的holtWinters模型批量对多个商户的营业额进行预测。
1.引入sparkts的maven坐标
sparkts应该很久没更新过了,其实sparkml相对其它常用机器学习平台来说还是不够主流。
<dependency>
<groupId>com.cloudera.sparkts</groupId>
<artifactId>sparkts</artifactId>
<version>0.4.1</version>
</dependency>
2.准备参数
//周期长度,是holtWinters模型中的一个重要 季节性(我理解为周期性)参数,这里是以一周7天为周期
val period: Int = conf.getInt("spark.hw.period", 7)
//holtWinters选择模型:additive(加法模型)、Multiplicative(乘法模型,常用)
val holtWintersModelType: String = conf.get("spark.hw.modeltype", "Multiplicative")
val zone: ZoneId = ZoneId.systemDefault()
// 需要指定训练用数据的开始时间,结束时间 ZonedDateTime类型
private val startTimeStr: String = conf.get("spark.start.time", new DateTime().plusMonths(-6).toString("yyyy-MM-dd"))
val startTime: DateTime = DateTime.parse(startTimeStr, DateTimeFormat.forPattern("yyyy-MM-dd"))
val startZonedDateTime: ZonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime.getMillis), zone)
private val endTimeStr: String = conf.get("spark.end.time", new DateTime().plusDays(-1).toString("yyyy-MM-dd"))
val endTime: DateTime = DateTime.parse(endTimeStr, DateTimeFormat.forPattern("yyyy-MM-dd"))
val endZonedDateTime: ZonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(endTime.getMillis), zone)
//商户id
private val lotIds: String = conf.get("spark.lot.filter", "123,456").trim()
3.获取历史数据,划分为训练数据和测试验证数据
def main(args: Array[String]): Unit = {
.....
val data = getLotDayReportHistoryData()
//以最后一天的数据作为测试验证数据,其它的为训练数据
val testDF = data.where(s"flow_date='${endTimeStr}'")
val trainDF = data.where(s"flow_date<'${endTimeStr}'")
.....
}
def getLotDayReportHistoryData(): DataFrame = {
val lotFilter = if (lotIds.contains("all")) {
"1=1"
} else {
val lotIdArr = lotIds.split(",")
s"lot_id in (${lotIdArr.map("'" + _ + "'").mkString(",")})"
}
//这里需要将时序的时间字段转为Timestamp类型,作为label的字段转为Double类型
//特别重要的一点是要按照时间正序排序
spark.sql(s"select lot_id,flow_date ,total_money from data_table where $lotFilter and flow_date >='${startTimeStr}' and flow_date <='${endTimeStr}' order by lot_id,flow_date asc")
.withColumn("flow_date", $"flow_date".cast(DataTypes.TimestampType))
.withColumn("total_money", $"total_money".cast(DataTypes.DoubleType))
.withColumn("is_holiday", isHoliday2($"flow_date", lit("yyyy-MM-dd HH:mm:ss")))
}
}
4.指定训练数据的时间跨度
这个会帮助我们创建TimeSeriesRDD,然后使用TimeSeriesRDD可以帮助我们进行数据的补全,筛选,DateTimeIndex.uniformFromInterval() 这个方法里的第三个参数是时间递增数,我这儿是7天。使用时根据业务需要合理的递增数
val dtIndex: UniformDateTimeIndex = DateTimeIndex.uniformFromInterval(startZonedDateTime, endZonedDateTime, new DayFrequency(7))
5.训练模型,预测结果
理想状态下,假如我设置的时间跨度所有商户营收数据都有,那么很好,训练预测过程会很顺利,但是实际情况下,我基于一年的训练数据,上万个商户很多商户的预测结果可能都是Double.NaN,或者自动补充的数据太多影响预测结果,在这个地方,我会先按照正常的处理方式预测一遍,将没有预测结果的商户数据重新拉出来使用普通RDD[(key,Vector)]进行holtWinters模型的训练和预测。
def main(args: Array[String]): Unit = {
.....
val data = getLotDayReportHistoryData()
//以最后一天的数据作为测试验证数据,其它的为训练数据
val testDF = data.where(s"flow_date='${endTimeStr}'")
val trainDF = data.where(s"flow_date<'${endTimeStr}'")
val dtIndex: UniformDateTimeIndex = DateTimeIndex.uniformFromInterval(startZonedDateTime, endZonedDateTime, new DayFrequency(7))
/**
* 3、训练和预测,批量商户模型训练的核心就是将每个商户的数据放在一个DenseVector(排好序了的),然后HoltWinters模型就会按照这个序列数据预测后续指定的N个值
*/
val result_1 = processStepOne(dtIndex, trainDF, testDF)
result_1.persist()
println("result_1总数为" + result_1.count())
val lessDateLotDF = trainDF.join(
result_1, Seq("lot_id"), "left_anti"
).groupBy("lot_id")
.agg(collect_list("total_money").as("values"))
.where(size($"values") >= 12)
lessDateLotDF.persist()
lessDateLotDF.show(5, false)
println("lessDateLotDF总数为" + lessDateLotDF.count())
val trainRdd: RDD[(String, DenseVector)] = lessDateLotDF
.rdd
.map(row => {
val series = row.getAs[mutable.WrappedArray[Double]]("values").toArray
(row.getAs[String]("lot_id"), new DenseVector(series))
})
val result_2 = processStepTwo(trainRdd, testDF)
val result = result_1.unionByName(result_2)
result.persist()
result.show(false)
.....
}
//第一次处理,使用TimeSeriesRDD进行训练模型
def processStepOne(dtIndex: UniformDateTimeIndex, trainDF: DataFrame, testDF: DataFrame): DataFrame = {
val trainTsRdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, trainDF, "flow_date", "lot_id", "total_money")
.filterStartingBefore(startZonedDateTime)
trainTsRdd.persist()
//填充缺失值,linear是线性填充,还有好几种(趋势填充,零值填充,上一个值,下一个值),后面记录源码时记录一下
val filledTrainTsRdd: TimeSeriesRDD[String] = trainTsRdd.fill("linear")
val forecast = holtWintersModelTrainKey(1, filledTrainTsRdd, period, holtWintersModelType)
processPredictResult(forecast, testDF)
}
//第一次处理,使用普通RDD进行训练模型
def processStepTwo(trainRdd: RDD[(String, DenseVector)], testDF: DataFrame): DataFrame = {
val forecast = holtWintersModelTrainKey2(1, trainRdd, period, holtWintersModelType)
processPredictResult(forecast, testDF)
}
//这个方法就是将测试数据和预测数据合并在一起,因为这里的预测数据只有一天,每个商户只预测1个值,所以我可以 .withColumn("flow_date", lit(endTimeStr).cast(DataTypes.TimestampType))
//如果预测多个值,不能这样拼接 测试数据和预测数据
def processPredictResult(forecast: RDD[(String, Array[Double])], testDF: DataFrame): DataFrame = {
val predictDF = forecast.toDF("lot_id", "fs")
.select($"lot_id", explode($"fs").as("predict"))
.withColumn("predict", bround($"predict"))
.where("predict is not null and predict!='NaN' ")
.withColumn("flow_date", lit(endTimeStr).cast(DataTypes.TimestampType))
val result = testDF.join(
predictDF, Seq("lot_id", "flow_date"), "right"
)
result
}
/**
* 预测结果
*/
def modelPredict(predictedN: Int, holtWintersAndVectorRdd: RDD[(String, HoltWintersModel, DenseVector)]
, period: Int, holtWintersModelType: String): (RDD[(String, Array[Double])]) = {
/** *预测出后N个的值 *****/
//构成N个预测值向量,之后导入到holtWinters的forcast方法中
val predictedArrayBuffer = new ArrayBuffer[Double]()
var i = 0
while (i < predictedN) {
predictedArrayBuffer += i
i = i + 1
}
val predictedVectors = Vectors.dense(predictedArrayBuffer.toArray)
//预测
val forecast: RDD[(String, Array[Double])] = holtWintersAndVectorRdd.mapPartitions { rows =>
rows.map {
case (key, holtWintersModel, denseVector) => {
val vector = holtWintersModel.forecast(denseVector, predictedVectors)
(key, vector.toArray)
}
}
}
forecast
}
/**
* 使用TimeSeriesRDD进行模型训练,
*/
def holtWintersModelTrainKey(predictedN: Int, trainTsRdd: TimeSeriesRDD[String], period: Int, holtWintersModelType: String): RDD[(String, Array[Double])] = {
/** *参数设置 ******/
/** *创建HoltWinters模型 ***/
//创建和训练HoltWinters模型.其RDD格式为(HoltWinters,Vector)
val holtWintersAndVectorRdd = trainTsRdd.mapPartitions { lines => {
lines.map {
case (key, denseVector: DenseVector) =>
(key, HoltWinters.fitModel(denseVector, period, holtWintersModelType), denseVector)
}
}
}
modelPredict(predictedN, holtWintersAndVectorRdd, period, holtWintersModelType)
}
/**
* 使用普通RDD进行模型训练
*/
def holtWintersModelTrainKey2(predictedN: Int, trainTsRdd: RDD[(String, DenseVector)], period: Int, holtWintersModelType: String): RDD[(String, Array[Double])] = {
/** *参数设置 ******/
/** *创建HoltWinters模型 ***/
//创建和训练HoltWinters模型.其RDD格式为(HoltWinters,Vector)
val holtWintersAndVectorRdd = trainTsRdd.mapPartitions { lines => {
lines.map {
case (key, denseVector: Vector) =>
// warn("当前key为:" + key)
(key, HoltWinters.fitModel(denseVector, period, holtWintersModelType), denseVector)
}
}
}
modelPredict(predictedN, holtWintersAndVectorRdd, period, holtWintersModelType)
}
6.问题解析,修改源码
sparkts在使用过程中,当我批量预测几万个商户时,会出现trust region step has failed to reduce Q异常,这个是源码中 com.cloudera.sparkts.models.HoltWinters类的def fitModelWithBOBYQA(ts: Vector, period: Int, modelType:String): HoltWintersModel 方法导致的,这个方法里optimizer.optimize(objectiveFunction, goal, bounds,initGuess, maxIter, maxEval)会出现这个异常。
def fitModelWithBOBYQA(ts: Vector, period: Int, modelType:String): HoltWintersModel = {
val optimizer = new BOBYQAOptimizer(7)
val objectiveFunction = new ObjectiveFunction(new MultivariateFunction() {
def value(params: Array[Double]): Double = {
new HoltWintersModel(modelType, period, params(0), params(1), params(2)).sse(ts)
}
})
// The starting guesses in R's stats:HoltWinters
val initGuess = new InitialGuess(Array(0.3, 0.1, 0.1))
val maxIter = new MaxIter(30000)
val maxEval = new MaxEval(30000)
val goal = GoalType.MINIMIZE
val bounds = new SimpleBounds(Array(0.0, 0.0, 0.0), Array(1.0, 1.0, 1.0))
val optimal = optimizer.optimize(objectiveFunction, goal, bounds,initGuess, maxIter, maxEval)
val params = optimal.getPoint
new HoltWintersModel(modelType, period, params(0), params(1), params(2))
}
需要修改这个方法,捕获的了异常,修改参数,重新调用这个方法,如下:
def fitModelWithBOBYQA(ts: Vector, period: Int, modelType: String, initGuess: InitialGuess = new InitialGuess(Array(0.3, 0.1, 0.1))): HoltWintersModel = {
val optimizer = new BOBYQAOptimizer(7)
val objectiveFunction = new ObjectiveFunction(new MultivariateFunction() {
def value(params: Array[Double]): Double = {
new HoltWintersModel(modelType, period, params(0), params(1), params(2)).sse(ts)
}
})
// The starting guesses in R's stats:HoltWinters
// val initGuess = new InitialGuess(Array(0.3, 0.1, 0.1))
val maxIter = new MaxIter(30000)
val maxEval = new MaxEval(30000)
val goal = GoalType.MINIMIZE
//bounds 范围低位设置非0极小值能降低异常出现的频率
val bounds = new SimpleBounds(Array(0.00001, 0.00001, 0.00001), Array(1.0, 1.0, 1.0))
var optimal: PointValuePair = null
var params = Array(0D, 0D, 0D)
try {
optimal = optimizer.optimize(objectiveFunction, goal, bounds, initGuess, maxIter, maxEval)
params = optimal.getPoint
new HoltWintersModel(modelType, period, params(0), params(1), params(2))
} catch {
case e: MathIllegalStateException =>
//如果出现错误,随机新的initGuess,从新再运行一次,直到不出现异常MathIllegalStateException: trust region step has failed to reduce Q
fitModelWithBOBYQA(ts, period, modelType, new InitialGuess(Array(math.random, 0.1, 0.1)))
case e => {
throw e
}
}
}
7.预测结果
这是一个效果较好的预测结果…

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)