如果代码通过不了,可能是内存不足,多试几次总会成功!

第1关:网约车撤销订单数据清洗

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.functions;

public class TrainClean {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession.builder().master("local").appName("Boxoffice_Movie").getOrCreate();

        // 1. 读取.dat文件
        Dataset<Row> df = spark.read()
                .option("header", "true")
                .option("delimiter", ",") // 明确指定逗号分隔符
                .csv("/data/workspace/myshixun/ProvOrderCancel/*.dat");

        // 2. 清理列名空格
        String[] columns = df.columns();
        for (String col : columns) {
            df = df.withColumnRenamed(col, col.trim());
        }

        // 3. 过滤必填字段
        String[] mandatoryColumns = {"companyid", "address", "orderid", "ordertime", "canceltime","cancelreason"};
        Column filterCond = functions.lit(true);

        // 检查所有字段都存在(即原始数据每行恰好有8个字段)
        String[] allColumns = df.columns();
        for (String col : allColumns) {
            filterCond = filterCond.and(df.col(col).isNotNull()); // 确保字段存在(非null)
        }

        // 原有必填字段非空检查
        for (String col : mandatoryColumns) {
            filterCond = filterCond.and(
                functions.trim(df.col(col)).isNotNull()
                    .and(functions.trim(df.col(col)).notEqual(""))
            );
        }

        // 添加时间前缀过滤
        filterCond = filterCond.and(df.col("ordertime").startsWith("20190307"))
                            .and(df.col("canceltime").startsWith("20190307"));

        df = df.filter(filterCond);

        // 4. 处理cancelreason中的"null"
        df = df.withColumn("cancelreason",
            functions.when(
                functions.trim(functions.lower(df.col("cancelreason"))).equalTo("null"),
                "未知"
            ).otherwise(df.col("cancelreason"))
        );

        // 5. 时间格式转换
        df = df.withColumn("ordertime",
                functions.date_format(
                    functions.to_timestamp(df.col("ordertime"), "yyyyMMddHHmmss"),
                    "yyyy-MM-dd HH:mm:ss"
                ))
               .withColumn("canceltime",
                functions.date_format(
                    functions.to_timestamp(df.col("canceltime"), "yyyyMMddHHmmss"),
                    "yyyy-MM-dd HH:mm:ss"
                ));

        // 6. 行政区划代码转换
        Dataset<Row> addressDF = spark.read()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/mydb")
                .option("dbtable", "t_address")
                .option("user", "root")
                .option("password", "123123")
                .load()
                .withColumn("address_code", functions.trim(functions.col("address_code")))
                .dropDuplicates("address_code");

        df = df.withColumn("address", functions.trim(df.col("address")));
        
        df = df.join(addressDF,
                df.col("address").equalTo(addressDF.col("address_code")),
                "left")
            .withColumn("districtname",
                functions.coalesce(addressDF.col("address_name"), functions.lit("未知")))
            .drop("address_code", "address_name");

        // 7. 去重(保留第一个出现的订单)
        df = df.dropDuplicates("orderid");

        // 8. 选择并排序字段
        df = df.select(
                "companyid", "address", "districtname", "orderid",
                "ordertime", "canceltime", "operator", "canceltypecode", "cancelreason"
            ).orderBy("orderid");

        // 9. 输出为单个文件
        df.coalesce(1)
          .write()
          .option("sep", "|")
          .option("header", false)
          .mode("overwrite")
          .csv("/root/files");

        spark.stop();
    }
}

第2关:网约车成功订单数据清洗

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.functions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class OrderClean {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession.builder().master("local").appName("OrderClean").getOrCreate();

        // 读取原始数据并赋予列名
        Dataset<Row> rawData = spark.read()
                .option("sep", ",")
                .csv("/data/workspace/myshixun/ProvOrderCreate/*")
                .toDF("companyid", "address", "orderid", "departtime", "ordertime", "passengernote", "departure",
                        "deplongitude", "deplatitude", "destination", "destlongitude", "destlatitude", "encrypt_c", "faretype");

        // 删除不需要的列
        Dataset<Row> filteredData = rawData.select(
                rawData.col("companyid"),
                rawData.col("address"),
                rawData.col("orderid"),
                rawData.col("departtime"),
                rawData.col("ordertime"),
                rawData.col("departure"),
                rawData.col("deplongitude"),
                rawData.col("deplatitude"),
                rawData.col("destination"),
                rawData.col("destlongitude"),
                rawData.col("destlatitude")
        );

        // 将各列的空字符串转换为null
        for (String colName : filteredData.columns()) {
            filteredData = filteredData.withColumn(colName,
                    functions.when(functions.col(colName).notEqual(""), functions.col(colName)).otherwise(functions.lit(null)));
        }

        // 注册处理经纬度的UDF
        UDF1<String, String> processLon = s -> {
            if (s == null || s.isEmpty()) return null;
            return s.length() < 3 ? s : s.substring(0, 3) + "." + s.substring(3);
        };
        spark.udf().register("processLon", processLon, DataTypes.StringType);

        UDF1<String, String> processLat = s -> {
            if (s == null || s.isEmpty()) return null;
            return s.length() < 2 ? s : s.substring(0, 2) + "." + s.substring(2);
        };
        spark.udf().register("processLat", processLat, DataTypes.StringType);

        // 处理经纬度字段
        filteredData = filteredData.withColumn("deplongitude", functions.callUDF("processLon", functions.col("deplongitude")))
                .withColumn("destlongitude", functions.callUDF("processLon", functions.col("destlongitude")))
                .withColumn("deplatitude", functions.callUDF("processLat", functions.col("deplatitude")))
                .withColumn("destlatitude", functions.callUDF("processLat", functions.col("destlatitude")));

        // 处理时间字段并生成时间戳列
        filteredData = filteredData
                .withColumn("ordertime_ts", functions.to_timestamp(functions.col("ordertime"), "yyyyMMddHHmmss"))
                .withColumn("departtime_ts", functions.to_timestamp(functions.col("departtime"), "yyyyMMddHHmmss"));

        // 过滤空值
        filteredData = filteredData.na().drop();

        // 过滤日期范围
        filteredData = filteredData.filter(
                functions.col("ordertime_ts").between("2019-03-07 00:00:00", "2019-03-07 23:59:59")
                        .and(functions.col("departtime_ts").between("2019-03-07 00:00:00", "2019-03-07 23:59:59"))
        );

        // 格式化时间字段并删除临时列
        filteredData = filteredData
                .withColumn("ordertime", functions.date_format(functions.col("ordertime_ts"), "yyyy-MM-dd HH:mm:ss"))
                .withColumn("departtime", functions.date_format(functions.col("departtime_ts"), "yyyy-MM-dd HH:mm:ss"))
                .drop("ordertime_ts", "departtime_ts");

        // 读取MySQL中的t_address表
        Dataset<Row> addressDF = spark.read()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
                .option("dbtable", "t_address")
                .option("user", "root")
                .option("password", "123123")
                .load();

        // 收集到Map
        List<Row> addressList = addressDF.collectAsList();
        Map<String, String> addressMap = new HashMap<>();
        for (Row row : addressList) {
            addressMap.put(row.getString(0), row.getString(1));
        }

        // 注册UDF并添加districtname列
        spark.udf().register("getDistrict", (String code) -> addressMap.getOrDefault(code, "未知"), DataTypes.StringType);
        filteredData = filteredData.withColumn("districtname", functions.callUDF("getDistrict", functions.col("address")));

        // 调整列顺序并去重
        filteredData = filteredData.select(
                functions.col("companyid"),
                functions.col("address"),
                functions.col("districtname"),
                functions.col("orderid"),
                functions.col("departtime"),
                functions.col("ordertime"),
                functions.col("departure"),
                functions.col("deplongitude"),
                functions.col("deplatitude"),
                functions.col("destination"),
                functions.col("destlongitude"),
                functions.col("destlatitude")
        ).dropDuplicates("orderid");

        // 输出结果到单个文件
        filteredData.coalesce(1)
        .write()
        .option("sep", "\t")
        .option("header", false)
        .mode("overwrite")
        .csv("/root/files");

        spark.stop();
    }
}

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐