阅读817 返回首页    go 阿里云 go 技术社区[云栖]


maxCompute平台非标准日期和气象数据处理方法--电力AI赛Rank 20解决方案

本次竞赛主要数据源为企业用电量表Tianchi_power,抽取了扬中市高新区的1000多家企业的用电量(数据进行了脱敏),包括企业ID(匿名化处理),日期和用电量。具体字段如下表:

tianchi_power

列名

类型

含义

示例

record_date

string

日期

20150101

user_id

bigint

企业id

1

power_consumption

bigint

用电量

1031

... ... ... ...

气象数据表为tianchi_weather_data,起内容如下表所示:

weather

选手提交结果表 

tianchi_power_answer

列名

类型

含义

示例

predict_date

string

日期

2016/9/1

power_consumption

bigint

预测的用电量

1031


这是一个短期负荷预测(short-term load forecasting)问题,国家电网于2010年曾出台过 国家电网企业标准 Q/GDW 552-2010 《电网短期超短期负荷预测技术规范》,在规范中对相关的术语、预测内容、误差计算公式、常用的预测算法等都做了介绍。在本次比赛中,由于负荷预测的用途不一样,因此并未完全遵守国家电网的企业标准中规定的预测内容(时间粒度和待预测时长),并且预测误差评价公式也采用了自定义的公式,但问题的本质并未改变,仍然是一个短期负荷预测问题。

我们在前期做光伏电站超短期发电功率预测时,发现缺失值和数值天气预报数据对预测精度的影响最大,并且国网的企业标准中对负荷预测的影响因素也有个大致的介绍:

国网负荷预测影响因素

由于社会事件等不可知,因此本次比赛中我们侧重解决缺失值和气象数据的问题,将主要工作集中在三个地方:

1)对官方给定的气象数据进行编码、变换等,构建完善的气象数据特征;

2)构建过拟合的模型来填充缺失值;

3)用修订数据构建模型一来预测趋势,原始数据构建模型二来预测用电量水平(大致值),再对两个模型进行加权融合;

三、 数据预处理

利用ODPS SQL提供的字符串正则处理函数regexp_extract,分别提取年、月、日的数据,然后转换成标准日期格式,代码如下:

-- 产生每日用电量总和
DROP TABLE IF EXISTS t_netivs_daily_sum_consumption;
CREATE TABLE IF NOT EXISTS t_netivs_daily_sum_consumption AS
SELECT
	*
	,(year*10000+month*100+day) as day_int -- 转化成 20160101 这种格式
	,(month*100+day) as month_day
	,(year*100+month) as year_month
	,((year-2015)*12+month) as month_index
FROM
(
	SELECT 
		*
		,cast(regexp_extract(record_date,'(.*)/(.*)/(.*)',1) as bigint) as year   -- 提取年
		,cast(regexp_extract(record_date,'(.*)/(.*)/(.*)',2) as bigint) as month  -- 提取月
		,cast(regexp_extract(record_date,'(.*)/(.*)/(.*)',3) as bigint) as day    -- 提取日
	FROM
	(
		SELECT
			record_date
			,sum(power_consumption) as power_consumption
		FROM
			odps_tc_257100_f673506e024.tianchi_power2
		GROUP BY
			record_date
	)t2
)t1
;

利用这个代码,可以方便的将2016/1/1这种非标准的日期数据转化为bigint类型的20160101这类数据,后续可以非常方便的用 to_data(cast(xxx as string),'yyyymmdd') 函数来将这类数据转化成日期类型。

3.2 节假日的实现

由于比赛过程中原则上是不允许上传和下载数据的,因此正规的做法是通过ODPS SQL中的case when来实现节假日的处理。这里给出节假日及日期特征的处理代码:

-- 产生扩展日期
DROP TABLE IF EXISTS 	t_netivs_date_features;
CREATE TABLE IF NOT EXISTS t_netivs_date_features AS
SELECT
	day_int
	,day_index
	,month_index
	,year_index
	,month
	,day
	,(month*100+day) as month_day
	,(year*100+month) as year_month
	,case when (weekday in (6,7) and special_workday == 0) or holiday==1 then 0 else 1 end as workday
	,weekofyear
	,day_to_lastday
	,month_day_num
	,weekday
	,holiday
	,special_workday
	,special_holiday
	,day1_before_special_holiday
	,day2_before_special_holiday
	,day3_before_special_holiday
	,day1_before_holiday
	,day2_before_holiday
	,day3_before_holiday
	,day1_after_special_holiday
	,day2_after_special_holiday
	,day3_after_special_holiday
	,day1_after_holiday
	,day2_after_holiday
	,day3_after_holiday
FROM
(
	SELECT
		day_int
		,dt
		,datediff(dt,to_date('2015-01-01','yyyy-mm-dd'),'dd')+1 as day_index
		,datediff(dt,to_date('2015-01-01','yyyy-mm-dd'),'mm')+1 as month_index
		,datepart(dt,'yyyy')-2015+1 as year_index
		,datepart(dt,'yyyy') as year
		,datepart(dt,'mm') as month
		,datepart(dt,'dd') as day
		,datepart(lastday(dt),'dd') as month_day_num
		,weekofyear(dt) as weekofyear
		,datediff(lastday(dt),dt,'dd') as day_to_lastday
		,weekday(dt) as weekday
		,holiday
		,special_workday
		,special_holiday
		
		,case when cast(to_char(dateadd(dt,-1,'dd'),'yyyymmdd') as bigint) in (20150101,20150218,20150305,20150405,20150501,20150620,20150903,20150927,20151001,20160101,20160207,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as day1_before_special_holiday
		,case when cast(to_char(dateadd(dt,-2,'dd'),'yyyymmdd') as bigint) in (20150101,20150218,20150305,20150405,20150501,20150620,20150903,20150927,20151001,20160101,20160207,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as day2_before_special_holiday
		,case when cast(to_char(dateadd(dt,-3,'dd'),'yyyymmdd') as bigint) in (20150101,20150218,20150305,20150405,20150501,20150620,20150903,20150927,20151001,20160101,20160207,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as day3_before_special_holiday
		
		,case when cast(to_char(dateadd(dt,-1,'dd'),'yyyymmdd') as bigint) in (20150101,20150218,20150404,20150501,20150620,20150903,20150927,20151001,20160101,20160207,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as day1_before_holiday
		,case when cast(to_char(dateadd(dt,-2,'dd'),'yyyymmdd') as bigint) in (20150101,20150218,20150404,20150501,20150620,20150903,20150927,20151001,20160101,20160207,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as day2_before_holiday
		,case when cast(to_char(dateadd(dt,-3,'dd'),'yyyymmdd') as bigint) in (20150101,20150218,20150404,20150501,20150620,20150903,20150927,20151001,20160101,20160207,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as day3_before_holiday
		
		,case when cast(to_char(dateadd(dt,1,'dd'),'yyyymmdd') as bigint) in (20150101,20150219,20150305,20150405,20150501,20150620,20150903,20150927,20151001,20160101,20160208,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as day1_after_special_holiday
		,case when cast(to_char(dateadd(dt,2,'dd'),'yyyymmdd') as bigint) in (20150101,20150219,20150305,20150405,20150501,20150620,20150903,20150927,20151001,20160101,20160208,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as day2_after_special_holiday
		,case when cast(to_char(dateadd(dt,3,'dd'),'yyyymmdd') as bigint) in (20150101,20150219,20150305,20150405,20150501,20150620,20150903,20150927,20151001,20160101,20160208,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as day3_after_special_holiday
		
		,case when cast(to_char(dateadd(dt,1,'dd'),'yyyymmdd') as bigint) in (20150103,20150224,20150406,20150503,20150622,20150905,20150927,20151007,20160101,20160213,20160404,20160502,20160611,20160917,20161007) then 1 else 0 end as day1_after_holiday
		,case when cast(to_char(dateadd(dt,2,'dd'),'yyyymmdd') as bigint) in (20150103,20150224,20150406,20150503,20150622,20150905,20150927,20151007,20160101,20160213,20160404,20160502,20160611,20160917,20161007) then 1 else 0 end as day2_after_holiday
		,case when cast(to_char(dateadd(dt,3,'dd'),'yyyymmdd') as bigint) in (20150103,20150224,20150406,20150503,20150622,20150905,20150927,20151007,20160101,20160213,20160404,20160502,20160611,20160917,20161007) then 1 else 0 end as day3_after_holiday
		
	FROM
	(
		SELECT
			day_int
			,to_date(to_char(day_int),'yyyymmdd') as dt
			,case when day_int in (20150101,20150102,20150103,20150218,20150219,20150220,20150221,20150222,20150223,20150224,20150404,20150405,20150406,20150501,20150502,20150503,20150620,20150621,20150622,20150903,20150904,20150905,20150927,20151001,20151002,20151003,20151004,20151005,20151006,20151007,20160101,20160207,20160208,20160209,20160210,20160211,20160212,20160213,20160404,20160501,20160502,20160609,20160610,20160611,20160915,20160916,20160917,20161001,20161002,20161003,20161004,20161005,20161006,20161007) then 1 else 0 end as holiday
			,case when day_int in (20150104,20150215,20150228,20150906,20151010,20160206,20160214,20160612,20160918,20161008,20161009) then 1 else 0 end as special_workday
			,case when day_int in (20150101,20150218,20150219,20150305,20150405,20150501,20150620,20150903,20150927,20151001,20160101,20160207,20160208,20160404,20160501,20160609,20160915,20161001) then 1 else 0 end as special_holiday
			
		FROM
			t_netivs_tianchi_weather_data	
	)t1
)t2
;

通过对数据进行分析,可以很容易的发现,节假日对每日用电总量的影响非常大,而且节假日对每日用电总量的影响有一定的延续性,比如某些节日快到的时候,用电量会突然增加或者下降,有些节日结束后,会有连续几天的用电量增加或者下降,因此这里对节假日做了比较细致的处理,增加了节前1/2/3天和节后1/2/3天的特征。

从上面的气象数据可以看出来,其中的气象、风速、风向等都是字符串数据,需要转换成数值型的数据才能用于机器学习模型。由于这里用的字符串可能的类型有限,其中一种方法是将字符串排序,用序号代表该字符串的编码,直接用于机器学习模型的输入特征。这种方式的好处是处理简单,借助ODPS SQL内置的row_number函数可以很方便的进行实现。但是这种时间的缺点也很明显:没有充分的利用不同气象类型之间的关联关系,比如大雨跟大到暴雨的关系。因此,我们这里采用了OPEN_MR来对气象数据进行了详细的处理,主要的处理思路为:

1)将数据表中所有的数据类型都找出来,观察其构成情况及类别;

2)考虑到部分气象只有一种类型,比如“大雨、中雨、小雨”,而有的气象是两种气象类型,如“大到暴雨、多云转阴”等,因此,将所有气象进行统一:只有一种类型的,就用两个一样的类型来表示;

3)对于每个类型的气象,设计 气象类型(晴、雪、雨等)、气象等级(小雨、中雨、大雨、暴雨等分别从1开始编号)、气象组合(气象类型+气象等级);

按这种思路处理后的气象数据的格式可以用如下的ODPS SQL语句来创建,并且用于OPEN_MR的输出表:

-- map reduce来处理气象数据的输出表
-- 线上给的12月份的气象数据已经一起完成了,所以不需要再更改
-- DROP TABLE IF EXISTS t_netivs_encode_weather;
CREATE TABLE IF NOT EXISTS t_netivs_encode_weather (
	day_int 			bigint
	,temperature_high 	bigint
	,temperature_low 	bigint
	,weather1			bigint
	,weather1_level		bigint
	,weather1_type		bigint
	,weather2			bigint
	,weather2_level		bigint
	,weather2_type		bigint
	,wind_direction		bigint
	,wind_speed			double
	,wind_speed1		double
	,wind_speed2		double
)
;

为了实现对气象数据的解析,编写了一个OPEN_MR来进行处理,其核心代码如下:

package powerai.weather_preprocess;


import java.util.regex.*;
import java.util.Map;
import java.util.HashMap;

import com.aliyun.odps.data.Record;

public class WeatherPreprocesser {
	// 最后输出用的列表
	public Record m_output_vals = null;
	//private AntibotUnitily util = new AntibotUnitily();
	public static long 		m_day_int 		= 0;
	public static long 		m_temp_high 	= 0;
	public static long 		m_temp_low 		= 0;

	//public static String 	m_org_weather;
	//public static String 	m_org_wind_direction;
	//public static String 	m_org_wind_speed;
	
	// 输出列名:day_int:bigint,temperature_high:bigint,temperature_low:bigint,
	// weather1:bigint,weather1_level:bigint,weather1_type:bigint,weather2:bigint,weather2_level:bigint,weather2_type:bigint,wind_direction:bigint,wind_speed:double,wind_speed1:double,wind_speed2:double
	long	m_weather1			= 0L;
	long 	m_weather1_level 	= 0L;
	long	m_weather1_type 	= 0L;
	long	m_weather2			= 0L;
	long	m_weather2_level 	= 0L;
	long	m_weather2_type 	= 0L;
	long	m_wind_direction 	= 0L;
	double	m_wind_speed		= 0.0;
	double 	m_wind_speed1  		= 0.0;
	double 	m_wind_speed2  		= 0.0;
	
	// 小雨、小到中雨、中雨、中到大雨、大雨、大到暴雨、暴雨、阵雨、雷雨、雷阵雨、小雪、中雪、大雪、雨夹雪、晴、阴、多云
	// 雨:小雨、小到中雨、中雨、中到大雨、大雨、大到暴雨、暴雨、阵雨、雷雨、雷阵雨
	// 雪:小雪、中雪、大雪、雨夹雪
	// 晴
	// 阴
	// 多云
		
	public void weather_encode(long day_int, long temperature_high, long temperature_low, String weather, String wind_direction, String wind_speed, Record vals){
				
		m_output_vals 		= vals;
		m_day_int 			= day_int;
		m_temp_high 		= temperature_high;
		m_temp_low 			= temperature_low;
				
		reset();
		
		weather_parser(weather);
		wind_direction_parser(wind_direction);
		wind_speed_parse(wind_speed);
				
		// 输出特征
		output();
	}
	
	private void reset(){

		m_weather1			= 0L;
		m_weather1_level 	= 0L;
		m_weather1_type 	= 0L;
		m_weather2			= 0L;
		m_weather2_level 	= 0L;
		m_weather2_type 	= 0L;
		m_wind_direction 	= 0L;	
		m_wind_speed  		= 0.0;
		m_wind_speed1  		= 0.0;
		m_wind_speed2  		= 0.0;
	}
	
	
	// -------------- 对气象进行重新编码 ---------------------------------------------//
	private void weather_parser(String weather){
			
		String weather1,weather2;
		
		// 如果最后一个字母是 ~ ,应该是不数据不完整,直接去掉 ~
		if(weather.endsWith("~")){
			weather = weather.substring(0, weather.length()-2);
		}
		
		weather = weather.replace("转", "~");
		
		// 解析a1的数据
        if(weather.contains("~")){
			
        	weather1 = weather.split("~")[0];
        	weather2 = weather.split("~")[1];
        }
        else {
        	weather1= weather;
        	weather2 = weather;
        }
        
        // 开始解析weather1和weather2
        // 小雨、小到中雨、中雨、中到大雨、大雨、大到暴雨、暴雨、阵雨、雷雨、雷阵雨、小雪、中雪、大雪、雨夹雪、晴、阴、多云
        m_weather1 = get_weather_index(weather1);
        m_weather1_level = get_weather_level(weather1);
        m_weather1_type = get_weather_type(weather1);
        
        m_weather2 = get_weather_index(weather2);
        m_weather2_level = get_weather_level(weather2);
        m_weather2_type = get_weather_type(weather2);
        
	}
	
	private long get_weather_index(String strWeather) {		
		return get_weather_type(strWeather)*100+get_weather_level(strWeather);
	}
	
	private long get_weather_level(String strWeather) {
		if(strWeather.contains("雪")) {
			// 小雪、中雪、大雪、雨夹雪
			if(strWeather.contains("小雪")) {
				return 1L;
			}
			else if(strWeather.contains("中雪")) {
				return 2L;
			}
			else if(strWeather.contains("大雪")) {
				return 3L;
			}
			else if(strWeather.contains("雨夹雪")) {
				return 4L;
			}
			else {
				return 0L;
			}
		}
		else if(strWeather.contains("雨")) {
			//小雨、小到中雨、中雨、中到大雨、大雨、大到暴雨、暴雨、阵雨、雷雨、雷阵雨
			if(strWeather.contains("小雨")) {
				return 1L;
			}
			else if(strWeather.contains("小到中雨")) {
				return 2L;
			}
			else if(strWeather.contains("中雨")) {
				return 3L;
			}
			else if(strWeather.contains("中到大雨")) {
				return 4L;
			}
			else if(strWeather.contains("大雨")) {
				return 5L;
			}
			else if(strWeather.contains("大到暴雨")) {
				return 6L;
			}
			else if(strWeather.contains("暴雨")) {
				return 7L;
			}
			else if(strWeather.contains("阵雨")) {
				return 8L;
			}
			else if(strWeather.contains("雷雨")) {
				return 9L;
			}
			else if(strWeather.contains("雷阵雨")) {
				return 10L;
			}
			else {
				return 0L;
			}
		}
		else {		
			return 0L;
		}
	}
	
	private long get_weather_type(String strWeather) {
		if(strWeather.contains("雪")) {
			return 1L;
		}
		else if(strWeather.contains("雨")) {
			return 2L;
		}
		else if(strWeather.contains("晴")) {
			return 3L;
		}
		else if(strWeather.contains("阴")) {
			return 4L;
		}
		else if(strWeather.contains("多云")) {
			return 5L;
		}
		else{
			return 0L;
		}
	}

	// -------------- 对风向进行重新编码 ---------------------------------------------//
	private void wind_direction_parser(String wind_direction){
		m_wind_direction = get_wind_direction(wind_direction);
	}
	
	private long get_wind_direction(String strWindDirection) {
		
		if(strWindDirection == null ||strWindDirection.trim().length() <2) {
			return 0L;
		}
		
		// 东风、东南风、南风、西南风、西风、西北风、北风、东北风
		if(strWindDirection.contains("东风")) {
			return 1L;
		}
		else if(strWindDirection.contains("东南风")) {
			return 2L;
		}
		else if(strWindDirection.contains("南风")) {
			return 3L;
		}
		else if(strWindDirection.contains("西南风")) {
			return 4L;
		}
		else if(strWindDirection.contains("西风")) {
			return 5L;
		}
		else if(strWindDirection.contains("西北风")) {
			return 6L;
		}
		else if(strWindDirection.contains("北风")) {
			return 7L;
		}
		else if(strWindDirection.contains("东北风")) {
			return 8L;
		}
		else {
			return 0L;
		}
	} 
	
	// -------------- 对风速进行重新编码 ---------------------------------------------//
	private void wind_speed_parse(String strWindSpeed){
		String strWindSpeed1,strWindSpeed2;
		
		
		strWindSpeed = strWindSpeed.replace("转", "~");
		
		// 解析a1的数据
        if(strWindSpeed.contains("~")){
			
        	strWindSpeed1 = strWindSpeed.split("~")[0];
        	strWindSpeed2 = strWindSpeed.split("~")[1];
        }
        else {
        	strWindSpeed1= strWindSpeed;
        	strWindSpeed2 = strWindSpeed;
        }
        
        // 开始解析strWindSpeed1和strWindSpeed2
        // 微风、1级、2级、小于3级、3级、3-4级、4级、4-5级、5级、5-6级、6级、6-7级、7级
        m_wind_speed1 	= get_wind_speed_code(strWindSpeed1);
        m_wind_speed2 	= get_wind_speed_code(strWindSpeed2);
        m_wind_speed 	= m_wind_speed1*100 + m_wind_speed2;
	}
	
	private double get_wind_speed_code(String strWindSpeed) {
        // 微风、1级、2级、小于3级、3级、3-4级、4级、4-5级、5级、5-6级、6级、6-7级、7级
		if(strWindSpeed.contains("微风")) {
			return 0.5;
		}
		else if(strWindSpeed.contains("1级")) {
			return 1.0;
		}
		else if(strWindSpeed.contains("2级")) {
			return 2.0;
		}
		else if(strWindSpeed.contains("小于3级")) {
			return 2.5;
		}
		else if(strWindSpeed.contains("3级")) {
			return 3.0;
		}
		else if(strWindSpeed.contains("3-4级")) {
			return 3.5;
		}
		else if(strWindSpeed.contains("4级")) {
			return 4.0;
		}
		else if(strWindSpeed.contains("4-5级")) {
			return 4.5;
		}
		else if(strWindSpeed.contains("5级")) {
			return 5.0;
		}
		else if(strWindSpeed.contains("5-6级")) {
			return 5.5;
		}
		else if(strWindSpeed.contains("6级")) {
			return 6.0;
		}
		else if(strWindSpeed.contains("6-7级")) {
			return 6.5;
		}
		else if(strWindSpeed.contains("7级")) {
			return 7.0;
		}
		else {
			return 0.0;
		}
	}

	private void output(){
		int idx = 0;

		m_output_vals.setBigint(idx++, m_weather1);
		m_output_vals.setBigint(idx++, m_weather1_level);
		m_output_vals.setBigint(idx++, m_weather1_type);
		m_output_vals.setBigint(idx++, m_weather2);
		m_output_vals.setBigint(idx++, m_weather2_level);
		m_output_vals.setBigint(idx++, m_weather2_type);
		m_output_vals.setBigint(idx++, m_wind_direction);		
		m_output_vals.setDouble(idx++, m_wind_speed);
		m_output_vals.setDouble(idx++, m_wind_speed1);
		m_output_vals.setDouble(idx++, m_wind_speed2);
		
	}
	
}

-- 618全部700天都是1,这种就不要去修订了,肯定一直会是缺失的(用电量为1)
-- 由此引出问题:用电量为1的数据,什么情况下需要修订,什么情况下不需要修订?
-- 思考:
-- 1)用户的用电量要足够大,否则不足以对整体的预测值造成影响;
-- 2)大于1的值要足够多,否则无法修订准确。
-- 因此,可以考虑:
-- 1)采用规则:过滤掉缺失值后,计算剩余天数的每日用电量均值,超过100的才统计

-- 几个重点关注的user_id:
-- 1307(1310,这两个完全一样),缺118
-- 514,缺200
-- 1355,缺118
-- 1133,缺547
-- 1308 (1309,这两个完全一样),缺568
-- 741,缺672

-- 根据最近3个月来筛选要重点关注的user_id:
-- 1146, 近3月缺失数:(22,31,8)
-- 1309, 近3月缺失数:(30,15,29)
-- 1147, 近3月缺失数:(30,19,0)
-- 1301, 近3月缺失数:(30,31,10)
-- 200, 近3月缺失数:(30,31,10)
-- 654, 近3月缺失数:(5,31,30)
-- 1421, 近3月缺失数:(30,31,7)
-- 650, 近3月缺失数:(0,0,10)

-- 经过详细分析,拟定采用的缺失数据填充规则:
-- 1. 11月份缺失值为30,所有历史用电量改成1;
-- 2. 除了11月份缺失值为30天的,其他non_default_power_consumption_median<2500的都不处理;
-- 3. 总缺失天数大于30的不处理;

DROP TABLE IF EXISTS t_netivs_user_missing_info;
CREATE TABLE IF NOT EXISTS t_netivs_user_missing_info AS
select
	case when t11.user_id is not null then t11.user_id else t2.user_id end as user_id
	,case when t11.missing_day_cnt is null then 0 else t11.missing_day_cnt end as missing_day_cnt
	,case when t11.first_default_day_int is null then 0 else t11.first_default_day_int end as first_default_day_int
	,case when t11.last_default_day_int is null then 0 else  t11.last_default_day_int end as last_default_day_int
	,case when t11.last1month_default_day_cnt is null then 0 else  t11.last1month_default_day_cnt end as last1month_default_day_cnt
	,case when t11.last2month_default_day_cnt is null then 0 else  t11.last2month_default_day_cnt end as last2month_default_day_cnt
	,case when t11.last3month_default_day_cnt is null then 0 else  t11.last3month_default_day_cnt end as last3month_default_day_cnt
	,case when t2.power_consumption_avg is null then 0 else  t2.power_consumption_avg end as power_consumption_avg
	,case when t2.power_consumption_median is null then 0 else  t2.power_consumption_median end as power_consumption_median
	,case when t2.power_consumption_max is null then 0 else  t2.power_consumption_max end as power_consumption_max
	,case when t2.power_consumption_min is null then 0 else  t2.power_consumption_min end as power_consumption_min
	,case when t2.first_non_default_day_int is null then 0 else  t2.first_non_default_day_int end as first_non_default_day_int
	,case when t2.last_non_default_day_int is null then 0 else  t2.last_non_default_day_int end as last_non_default_day_int
from
(
	select
		* 
	from
	(
		select 
			user_id
			,count(*) as missing_day_cnt
			,min(day_int) as first_default_day_int
			,max(day_int) as last_default_day_int
			,SUM(case when day_int>=20161101 and day_int<20161201 then 1 else 0 end) as last1month_default_day_cnt
			,SUM(case when day_int>=20161001 and day_int<20161101 then 1 else 0 end) as last2month_default_day_cnt
			,SUM(case when day_int>=20160901 and day_int<20161001 then 1 else 0 end) as last3month_default_day_cnt			
		from 
			t_netivs_ext_power
		where
			power_consumption=1
		group by
			user_id
	)t1
		where missing_day_cnt>1
)t11
FULL OUTER JOIN
(
	select 
		user_id
		,avg(power_consumption) as power_consumption_avg
		,median(power_consumption) as power_consumption_median
		,max(power_consumption) as power_consumption_max
		,min(power_consumption) as power_consumption_min
		,min(day_int) as first_non_default_day_int
		,max(day_int) as last_non_default_day_int
	from 
		t_netivs_ext_power
	where
		power_consumption<>1
	group by
		user_id
)t2
ON t11.user_id = t2.user_id
;

SELECT * FROM t_netivs_user_missing_info where missing_day_cnt>0 ORDER BY power_consumption_median desc limit 500;

-- 产生要用xgboost来填充的user_id的列表
DROP TABLE IF EXISTS t_netivs_xgb_fill_user_day_list;
DROP TABLE IF EXISTS t_netivs_gbdt_fill_user_day_list;
CREATE TABLE IF NOT EXISTS t_netivs_gbdt_fill_user_day_list AS
	SELECT
		user_id
		,day_int
	FROM
		t_netivs_ext_power
	WHERE
		power_consumption =1 and user_id in 
		(
			SELECT 
				user_id
			FROM
				t_netivs_user_missing_info
			WHERE
				power_consumption_median>2500 and missing_day_cnt<30 and missing_day_cnt>0
		)		
;


-- 产生要用来训练xgboost模型的user_id列表
DROP TABLE IF EXISTS t_netivs_xgb_fill_train_user_list;
DROP TABLE IF EXISTS t_netivs_gbdt_fill_train_user_list;
CREATE TABLE IF NOT EXISTS t_netivs_gbdt_fill_train_user_list AS
	SELECT
		user_id
		,day_int
	FROM
		t_netivs_ext_power
	WHERE
		power_consumption <> 1 and user_id in 
		(
			SELECT 
				user_id
			FROM
				t_netivs_user_missing_info
			WHERE
				power_consumption_median>2500 and missing_day_cnt<30
		)
;


-- 产生要把历史数据全部清0的user_id的列表
DROP TABLE IF EXISTS t_netivs_clear_historical_data_user_list;
CREATE TABLE IF NOT EXISTS t_netivs_clear_historical_data_user_list AS
	SELECT 
		user_id
	FROM
		t_netivs_user_missing_info
	WHERE
		last1month_default_day_cnt=30
;


-- 产生GBDT训练集

DROP TABLE IF EXISTS t_netivs_gbdt_fill_consumption_features;
CREATE TABLE IF NOT EXISTS t_netivs_gbdt_fill_consumption_features AS
	SELECT
		t1.user_id
		,t1.day_int
		,case when t2.weekly_power_consumption_avg is null then 0 else t2.weekly_power_consumption_avg end as weekly_power_consumption_avg
		,case when t2.weekly_power_consumption_median is null then 0 else t2.weekly_power_consumption_median end as weekly_power_consumption_median
		,case when t3.monthly_power_consumption_avg is null then 0 else t3.monthly_power_consumption_avg end as monthly_power_consumption_avg
		,case when t3.monthly_power_consumption_median is null then 0 else t3.monthly_power_consumption_median end as monthly_power_consumption_median
		,case when t4.last_weekly_power_consumption_avg is null then 0 else t4.last_weekly_power_consumption_avg end as last_weekly_power_consumption_avg
		,case when t4.last_weekly_power_consumption_median is null then 0 else t4.last_weekly_power_consumption_median end as last_weekly_power_consumption_median
		,case when t5.last_monthly_power_consumption_avg is null then 0 else t5.last_monthly_power_consumption_avg end as last_monthly_power_consumption_avg
		,case when t5.last_monthly_power_consumption_median is null then 0 else t5.last_monthly_power_consumption_median end as last_monthly_power_consumption_median
	FROM
		t_netivs_ext_power t1
	LEFT OUTER JOIN
	(
		SELECT
			user_id			
			,weekofyear
			,avg(power_consumption) as weekly_power_consumption_avg
			,median(power_consumption) as weekly_power_consumption_median
		FROM
			t_netivs_ext_power
		WHERE
			power_consumption<>1
		GROUP BY
			user_id,weekofyear
	)t2
	ON t1.user_id = t2.user_id and t1.weekofyear = t2.weekofyear
	LEFT OUTER JOIN
	(
		SELECT
			user_id			
			,year_month
			,avg(power_consumption) as monthly_power_consumption_avg
			,median(power_consumption) as monthly_power_consumption_median
		FROM
			t_netivs_ext_power
		WHERE
			power_consumption<>1
		GROUP BY
			user_id,year_month
	)t3
	ON t1.user_id = t3.user_id and t1.year_month = t3.year_month
	LEFT OUTER JOIN
	(
		SELECT
			user_id			
			,weekofyear
			,avg(power_consumption) as last_weekly_power_consumption_avg
			,median(power_consumption) as last_weekly_power_consumption_median
		FROM
			t_netivs_ext_power
		WHERE
			power_consumption<>1
		GROUP BY
			user_id,weekofyear
	)t4
	ON t1.user_id = t4.user_id and t1.weekofyear = t4.weekofyear+1
	LEFT OUTER JOIN
	(
		SELECT
			user_id			
			,year_month
			,avg(power_consumption) as last_monthly_power_consumption_avg
			,median(power_consumption) as last_monthly_power_consumption_median
		FROM
			t_netivs_ext_power
		WHERE
			power_consumption<>1
		GROUP BY
			user_id,year_month
	)t5
	ON t1.user_id = t5.user_id and t1.year_month = t5.year_month+1
;


DROP TABLE IF EXISTS t_netivs_gbdt_fill_train_features;
CREATE TABLE IF NOT EXISTS t_netivs_gbdt_fill_train_features AS
	SELECT
		t1.user_id
		,t1.day_int
		,t2.temperature_high
		,t2.temperature_low
		,t2.weather1
		,t2.weather1_level
		,t2.weather1_type
		,t2.weather2
		,t2.weather2_level
		,t2.weather2_type
		,t2.wind_direction
		,t2.wind_speed
		,t2.wind_speed1
		,t2.wind_speed2
		,t3.day_index
		,t3.month_index
		,t3.year_index
		,t3.month
		,t3.day
		,t3.workday
		,t3.weekday
		,t3.holiday
		,t3.special_workday
		,t3.special_holiday
		,t3.day1_before_special_holiday
		,t3.day2_before_special_holiday
		,t3.day3_before_special_holiday
		,t3.day1_before_holiday
		,t3.day2_before_holiday
		,t3.day3_before_holiday
		,t3.day1_after_special_holiday
		,t3.day2_after_special_holiday
		,t3.day3_after_special_holiday
		,t3.day1_after_holiday
		,t3.day2_after_holiday
		,t3.day3_after_holiday
		,t4.weekly_power_consumption_avg
		,t4.weekly_power_consumption_median
		,t4.monthly_power_consumption_avg
		,t4.monthly_power_consumption_median
		,t4.last_weekly_power_consumption_avg
		,t4.last_weekly_power_consumption_median
		,t4.last_monthly_power_consumption_avg
		,t4.last_monthly_power_consumption_median
		,t5.power_consumption
	FROM
		t_netivs_gbdt_fill_train_user_list t1
	LEFT OUTER JOIN
		t_netivs_encode_weather t2
	ON t1.day_int = t2.day_int
	LEFT OUTER JOIN
		t_netivs_date_features t3
	ON t1.day_int = t3.day_int
	LEFT OUTER JOIN
		t_netivs_gbdt_fill_consumption_features t4
	ON t1.user_id = t4.user_id and t1.day_int = t4.day_int
	LEFT OUTER JOIN
		t_netivs_ext_power t5
	ON t1.user_id = t5.user_id and t1.day_int = t5.day_int
	
;

-- 产生gbdt填充的测试集
DROP TABLE IF EXISTS t_netivs_gbdt_fill_test_features;
CREATE TABLE IF NOT EXISTS t_netivs_gbdt_fill_test_features AS
	SELECT
		t1.user_id
		,t1.day_int
		,t2.temperature_high
		,t2.temperature_low
		,t2.weather1
		,t2.weather1_level
		,t2.weather1_type
		,t2.weather2
		,t2.weather2_level
		,t2.weather2_type
		,t2.wind_direction
		,t2.wind_speed
		,t2.wind_speed1
		,t2.wind_speed2
		,t3.day_index
		,t3.month_index
		,t3.year_index
		,t3.month
		,t3.day
		,t3.workday
		,t3.weekday
		,t3.holiday
		,t3.special_workday
		,t3.special_holiday
		,t3.day1_before_special_holiday
		,t3.day2_before_special_holiday
		,t3.day3_before_special_holiday
		,t3.day1_before_holiday
		,t3.day2_before_holiday
		,t3.day3_before_holiday
		,t3.day1_after_special_holiday
		,t3.day2_after_special_holiday
		,t3.day3_after_special_holiday
		,t3.day1_after_holiday
		,t3.day2_after_holiday
		,t3.day3_after_holiday
		,t4.weekly_power_consumption_avg
		,t4.weekly_power_consumption_median
		,t4.monthly_power_consumption_avg
		,t4.monthly_power_consumption_median
		,t4.last_weekly_power_consumption_avg
		,t4.last_weekly_power_consumption_median
		,t4.last_monthly_power_consumption_avg
		,t4.last_monthly_power_consumption_median
	FROM
		t_netivs_gbdt_fill_user_day_list t1
	LEFT OUTER JOIN
		t_netivs_encode_weather t2
	ON t1.day_int = t2.day_int
	LEFT OUTER JOIN
		t_netivs_date_features t3
	ON t1.day_int = t3.day_int
	LEFT OUTER JOIN
		t_netivs_gbdt_fill_consumption_features t4
	ON t1.user_id = t4.user_id and t1.day_int = t4.day_int
;

-- gbdt模型来过拟合预测一个值作为实际值(填充)
-- DROP OFFLINEMODEL IF EXISTS m_gbdt_fill_model;
-- DROP TABLE IF EXISTS t_netivs_gbdt_fill_model_feature_importance;
-- -- train
-- PAI -name GBDT 
-- -project algo_public 
-- -DfeatureSplitValueMaxSize="500" 
-- -DlossType="3" 
-- -DrandSeed="0" 
-- -DnewtonStep="1" 
-- -Dshrinkage="0.05" 
-- -DmaxLeafCount="32" 
-- -DlabelColName="power_consumption" 
-- -DinputTableName="t_netivs_gbdt_fill_train_features" 
-- -DoutputImportanceTableName="t_netivs_gbdt_fill_model_feature_importance" 
-- -DminLeafSampleCount="500" 
-- -DsampleRatio="0.6" 
-- -DmaxDepth="10" 
-- -DmodelName="m_gbdt_fill_model" 
-- -DmetricType="0" 
-- -DfeatureRatio="0.6" 
-- -Dp="1" 
-- -Dtau="0.6" 
-- -DtestRatio="0" 
-- -DfeatureColNames="user_id,temperature_high,temperature_low,weather1,weather1_level,weather1_type,weather2,weather2_level,weather2_type,wind_direction,day_index,month_index,year_index,month,day,workday,weekday,holiday,special_workday,special_holiday,day1_before_special_holiday,day2_before_special_holiday,day3_before_special_holiday,day1_before_holiday,day2_before_holiday,day3_before_holiday,day1_after_special_holiday,day2_after_special_holiday,day3_after_special_holiday,day1_after_holiday,day2_after_holiday,day3_after_holiday,wind_speed,wind_speed1,wind_speed2,weekly_power_consumption_avg,weekly_power_consumption_median,monthly_power_consumption_avg,monthly_power_consumption_median,last_weekly_power_consumption_avg,last_weekly_power_consumption_median,last_monthly_power_consumption_avg,last_monthly_power_consumption_median" 
-- -DtreeCount="2000"
-- ;


-- predict
-- DROP TABLE IF EXISTS t_netivs_gbdt_fill_prediction_result;
-- PAI
-- -name prediction
-- -project algo_public
-- -DdetailColName="prediction_detail"
-- -DappendColNames="user_id,day_int"
-- -DmodelName="m_gbdt_fill_model"
-- -DitemDelimiter=","
-- -DresultColName="prediction_result"
-- -Dlifecycle="28"
-- -DoutputTableName="t_netivs_gbdt_fill_prediction_result"
-- -DscoreColName="prediction_score"
-- -DkvDelimiter=":"
-- -DfeatureColNames="user_id,temperature_high,temperature_low,weather1,weather1_level,weather1_type,weather2,weather2_level,weather2_type,wind_direction,day_index,month_index,year_index,month,day,workday,weekday,holiday,special_workday,special_holiday,day1_before_special_holiday,day2_before_special_holiday,day3_before_special_holiday,day1_before_holiday,day2_before_holiday,day3_before_holiday,day1_after_special_holiday,day2_after_special_holiday,day3_after_special_holiday,day1_after_holiday,day2_after_holiday,day3_after_holiday,wind_speed,wind_speed1,wind_speed2,weekly_power_consumption_avg,weekly_power_consumption_median,monthly_power_consumption_avg,monthly_power_consumption_median,last_weekly_power_consumption_avg,last_weekly_power_consumption_median,last_monthly_power_consumption_avg,last_monthly_power_consumption_median"
-- -DinputTableName="t_netivs_gbdt_fill_test_features"
-- -DenableSparse="false";

-- SELECT * FROM t_netivs_gbdt_fill_prediction_result ORDER BY prediction_result DESC limit 500;

-- 用xgb来产生填充值
DROP TABLE IF EXISTS t_netivs_xgb_fill_prediction_result;
DROP OFFLINEMODEL IF EXISTS m_xgb_fill_model;

-- train
PAI
-name xgboost
-project algo_public
-Deta="0.01"
---Dobjective="reg:linear"
-Dobjective="reg:linear"
-DitemDelimiter=","
-Dseed="0"
-Dnum_round="3500"
-DlabelColName="power_consumption"
-DinputTableName="t_netivs_gbdt_fill_train_features"
-DenableSparse="false"
-Dmax_depth="8"
-Dsubsample="0.4"
-Dcolsample_bytree="0.6"
-DmodelName="m_xgb_fill_model"
-Dgamma="0"
-Dlambda="50" 
-DfeatureColNames="user_id,temperature_high,temperature_low,weather1,weather1_level,weather1_type,weather2,weather2_level,weather2_type,wind_direction,day_index,month_index,year_index,month,day,workday,weekday,holiday,special_workday,special_holiday,day1_before_special_holiday,day2_before_special_holiday,day3_before_special_holiday,day1_before_holiday,day2_before_holiday,day3_before_holiday,day1_after_special_holiday,day2_after_special_holiday,day3_after_special_holiday,day1_after_holiday,day2_after_holiday,day3_after_holiday,wind_speed,wind_speed1,wind_speed2,weekly_power_consumption_avg,weekly_power_consumption_median,monthly_power_consumption_avg,monthly_power_consumption_median,last_weekly_power_consumption_avg,last_weekly_power_consumption_median,last_monthly_power_consumption_avg,last_monthly_power_consumption_median"
-Dbase_score="0.11"
-Dmin_child_weight="100"
-DkvDelimiter=":";


-- predict
PAI
-name prediction
-project algo_public
-DdetailColName="prediction_detail"
-DappendColNames="day_int"
-DmodelName="m_xgb_fill_model"
-DitemDelimiter=","
-DresultColName="prediction_result"
-Dlifecycle="28"
-DoutputTableName="t_netivs_xgb_fill_prediction_result"
-DscoreColName="prediction_score"
-DkvDelimiter=":"
-DfeatureColNames="user_id,temperature_high,temperature_low,weather1,weather1_level,weather1_type,weather2,weather2_level,weather2_type,wind_direction,day_index,month_index,year_index,month,day,workday,weekday,holiday,special_workday,special_holiday,day1_before_special_holiday,day2_before_special_holiday,day3_before_special_holiday,day1_before_holiday,day2_before_holiday,day3_before_holiday,day1_after_special_holiday,day2_after_special_holiday,day3_after_special_holiday,day1_after_holiday,day2_after_holiday,day3_after_holiday,wind_speed,wind_speed1,wind_speed2,weekly_power_consumption_avg,weekly_power_consumption_median,monthly_power_consumption_avg,monthly_power_consumption_median,last_weekly_power_consumption_avg,last_weekly_power_consumption_median,last_monthly_power_consumption_avg,last_monthly_power_consumption_median"
-DinputTableName="t_netivs_gbdt_fill_test_features"
-DenableSparse="false";

SELECT * FROM t_netivs_xgb_fill_prediction_result ORDER BY prediction_result desc limit 100;

-- 产生修订后的每日用电量详单
-- t_netivs_clear_historical_data_user_list内的user_id全部清零
DROP TABLE IF EXISTS t_netivs_fixed_ext_power;
CREATE TABLE IF NOT EXISTS t_netivs_fixed_ext_power AS
	SELECT
		t1.user_id
		,t1.day_int
		--,cast(case when t2.prediction_result is not null then round(t2.prediction_result,0) when t3.user_id is not null then 1 else power_consumption end as bigint) as power_consumption
		,cast(case when t2.prediction_result is not null then round(t2.prediction_result,0) else power_consumption end as bigint) as power_consumption
	FROM
		t_netivs_ext_power t1
	LEFT OUTER JOIN
		t_netivs_gbdt_fill_prediction_result t2
	ON t1.user_id = t2.user_id and t1.day_int = t2.day_int
	LEFT OUTER JOIN
		t_netivs_clear_historical_data_user_list t3
	ON t1.user_id = t3.user_id
;

-- 产生修订后的每日用电量总和
DROP TABLE IF EXISTS t_netivs_fixed_daily_sum_consumption;
CREATE TABLE IF NOT EXISTS t_netivs_fixed_daily_sum_consumption AS
SELECT
	t1.day_int
	,t1.power_consumption
	,t2.fixed_power_consumption

最后更新:2017-07-26 09:32:44

  上一篇:go  Maxcompute的任务状态和多任务执行
  下一篇:go  商城之购物车商品添加和结算的小效果----基于Vue.js