首页 \ 问答 \ 如何为Spark写一个数据帧的多个WHEN条件?(How to write multiple WHEN conditions for Spark a dataframe?)

如何为Spark写一个数据帧的多个WHEN条件?(How to write multiple WHEN conditions for Spark a dataframe?)

我必须加入两个数据框并根据某些条件选择所有列。 这是一个例子:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))

val rdd = sc.textFile("s3://trfsmallfffile/FinancialLineItem/MAIN")
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))

val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialLineItem/INCR")
val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")


val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
      .select($"LineItem_organizationId", $"LineItem_lineItemId",
        when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
        when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
        when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"),
        when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"),
        when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"),
        when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"),
        when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"),
        when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"),
        when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"),
        when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
        when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"),
        when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed").as("IsRangeAllowed"),
        when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"),
        when($"SegmentGroupDescription_1".isNotNull, $"SegmentGroupDescription_1").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"),
        when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"),
        when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"),
        when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"),
        when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"),
        when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"),
        when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"),
        when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"),
        when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"),
        when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"),
        when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit").as("IsCredit"),
        when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
        when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
        when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
        when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
        .filter(!$"FFAction|!|".contains("D|!|"))

val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",$"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)


dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/FinancialLineItem/output")

现在我必须明确写出所有列的条件。 所有列的条件是否有任何方法不重复?

在我的条件中,列的null值为String 。因此,应用coalesce可能很困难。

这是数据框架之一。

LineItem.organizationId|^|LineItem.lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
4295879842|^|1246|^|CUS|^|Net Sales-Customer Segment|^|相手先別の販売高(相手先別)|^|JCSNTS|^|REXM|^|False|^||^||^||^||^|False|^|False|^|CUS_JCSNTS|^||^||^|505126|^|505074|^|505074|^|505126|^|505126|^||^|505074|^|True|^|3020155|^|3015249|^||^|I|!|

这是我的数据框架2。

DataPartition_1|^|TimeStamp|^|LineItem.organizationId|^|LineItem.lineItemId|^|StatementTypeCode_1|^|LineItemName_1|^|LocalLanguageLabel_1|^|FinancialConceptLocal_1|^|FinancialConceptGlobal_1|^|IsDimensional_1|^|InstrumentId_1|^|LineItemSequence_1|^|PhysicalMeasureId_1|^|FinancialConceptCodeGlobalSecondary_1|^|IsRangeAllowed_1|^|IsSegmentedByOrigin_1|^|SegmentGroupDescription_1|^|SegmentChildDescription_1|^|SegmentChildLocalLanguageLabel_1|^|LocalLanguageLabel.languageId_1|^|LineItemName.languageId_1|^|SegmentChildDescription.languageId_1|^|SegmentChildLocalLanguageLabel.languageId_1|^|SegmentGroupDescription.languageId_1|^|SegmentMultipleFundbDescription_1|^|SegmentMultipleFundbDescription.languageId_1|^|IsCredit_1|^|FinancialConceptLocalId_1|^|FinancialConceptGlobalId_1|^|FinancialConceptCodeGlobalSecondaryId_1|^|FFAction_1
SelfSourcedPublic|^|1511869196612|^|4295902451|^|10|^|BAL|^|Short term notes payable - related party|^|null|^|null|^|LSOD|^|false|^|null|^|null|^|null|^|null|^|false|^|false|^|null|^|null|^|null|^|null|^|505074|^|null|^|null|^|null|^|null|^|null|^|null|^|null|^|3019157|^|null|^|I|!|

这是我到目前为止所尝试的

println("Enterin In to Spark Mode ")

    val conf = new SparkConf().setAppName("FinanicalLineItem").setMaster("local");
    val sc = new SparkContext(conf); //Creating spark context
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)


    val mainFileURL = "C://Users//u6034690//Desktop//SPARK//trfsmallfffile//FinancialLineItem//MAIN"
    val incrFileURL = "C://Users//u6034690//Desktop//SPARK//trfsmallfffile//FinancialLineItem//INCR"
    val outputFileURL = "C://Users//u6034690//Desktop//SPARK//trfsmallfffile//FinancialLineItem//output"
    val descrFileURL = "C://Users//u6034690//Desktop//SPARK//trfsmallfffile//FinancialLineItem//Descr"

    val src = new Path(outputFileURL)
    val dest = new Path(mainFileURL)
    val hadoopconf = sc.hadoopConfiguration
    val fs = src.getFileSystem(hadoopconf)

    sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

    myUtil.Utility.DeleteOuptuFolder(fs, outputFileURL)
    myUtil.Utility.DeleteDescrFolder(fs, descrFileURL)

    import sqlContext.implicits._

    val rdd = sc.textFile(mainFileURL)
    val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
    val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
    val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

    val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
    val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

    val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))

    val columnsNameArray = schema.fieldNames

    val df1resultFinal = data.withColumn("DataPartition", get_cus_val(input_file_name))
    val rdd1 = sc.textFile(incrFileURL)
    val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
    val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
    val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)

    val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

    val columnMap = latestForEachKey.columns
      .filter(_.endsWith("_1"))
      .map(c => c -> c.dropRight(2))
      .toMap + ("FFAction_1" -> "FFAction|!|")


        val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
        val exprsExtended = exprs ++ Array(col("LineItem_organizationId"), col("LineItem_lineItemId"))
        println(exprsExtended)
        val df2 = data.select(exprsExtended: _*)//This line has compilation issue .

type mismatch; found : scala.collection.immutable.Iterable[org.apache.spark.sql.Column] required: Seq[?]

此外,当我打印exprsExtended我在我的输出列中

coalesce(LineItemSequence_1, LineItemSequence) AS `LineItemSequence`,

I have to join two data frame and select all of its columns based on some condition. Here is an example:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))

val rdd = sc.textFile("s3://trfsmallfffile/FinancialLineItem/MAIN")
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))

val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialLineItem/INCR")
val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")


val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
      .select($"LineItem_organizationId", $"LineItem_lineItemId",
        when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
        when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
        when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"),
        when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"),
        when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"),
        when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"),
        when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"),
        when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"),
        when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"),
        when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
        when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"),
        when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed").as("IsRangeAllowed"),
        when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"),
        when($"SegmentGroupDescription_1".isNotNull, $"SegmentGroupDescription_1").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"),
        when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"),
        when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"),
        when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"),
        when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"),
        when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"),
        when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"),
        when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"),
        when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"),
        when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"),
        when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit").as("IsCredit"),
        when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
        when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
        when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
        when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
        .filter(!$"FFAction|!|".contains("D|!|"))

val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",$"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)


dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/FinancialLineItem/output")

Now I have to write when condition for all columns explicitly. Is there any way not to repeat when condition for all columns?

In my condition null value of columns comes null as String .So applying coalesce can be difficult .

Here is Data Frame one .

LineItem.organizationId|^|LineItem.lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
4295879842|^|1246|^|CUS|^|Net Sales-Customer Segment|^|相手先別の販売高(相手先別)|^|JCSNTS|^|REXM|^|False|^||^||^||^||^|False|^|False|^|CUS_JCSNTS|^||^||^|505126|^|505074|^|505074|^|505126|^|505126|^||^|505074|^|True|^|3020155|^|3015249|^||^|I|!|

Here is my Data Frame 2 .

DataPartition_1|^|TimeStamp|^|LineItem.organizationId|^|LineItem.lineItemId|^|StatementTypeCode_1|^|LineItemName_1|^|LocalLanguageLabel_1|^|FinancialConceptLocal_1|^|FinancialConceptGlobal_1|^|IsDimensional_1|^|InstrumentId_1|^|LineItemSequence_1|^|PhysicalMeasureId_1|^|FinancialConceptCodeGlobalSecondary_1|^|IsRangeAllowed_1|^|IsSegmentedByOrigin_1|^|SegmentGroupDescription_1|^|SegmentChildDescription_1|^|SegmentChildLocalLanguageLabel_1|^|LocalLanguageLabel.languageId_1|^|LineItemName.languageId_1|^|SegmentChildDescription.languageId_1|^|SegmentChildLocalLanguageLabel.languageId_1|^|SegmentGroupDescription.languageId_1|^|SegmentMultipleFundbDescription_1|^|SegmentMultipleFundbDescription.languageId_1|^|IsCredit_1|^|FinancialConceptLocalId_1|^|FinancialConceptGlobalId_1|^|FinancialConceptCodeGlobalSecondaryId_1|^|FFAction_1
SelfSourcedPublic|^|1511869196612|^|4295902451|^|10|^|BAL|^|Short term notes payable - related party|^|null|^|null|^|LSOD|^|false|^|null|^|null|^|null|^|null|^|false|^|false|^|null|^|null|^|null|^|null|^|505074|^|null|^|null|^|null|^|null|^|null|^|null|^|null|^|3019157|^|null|^|I|!|

This is what i have tried so far

println("Enterin In to Spark Mode ")

    val conf = new SparkConf().setAppName("FinanicalLineItem").setMaster("local");
    val sc = new SparkContext(conf); //Creating spark context
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)


    val mainFileURL = "C://Users//u6034690//Desktop//SPARK//trfsmallfffile//FinancialLineItem//MAIN"
    val incrFileURL = "C://Users//u6034690//Desktop//SPARK//trfsmallfffile//FinancialLineItem//INCR"
    val outputFileURL = "C://Users//u6034690//Desktop//SPARK//trfsmallfffile//FinancialLineItem//output"
    val descrFileURL = "C://Users//u6034690//Desktop//SPARK//trfsmallfffile//FinancialLineItem//Descr"

    val src = new Path(outputFileURL)
    val dest = new Path(mainFileURL)
    val hadoopconf = sc.hadoopConfiguration
    val fs = src.getFileSystem(hadoopconf)

    sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

    myUtil.Utility.DeleteOuptuFolder(fs, outputFileURL)
    myUtil.Utility.DeleteDescrFolder(fs, descrFileURL)

    import sqlContext.implicits._

    val rdd = sc.textFile(mainFileURL)
    val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
    val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
    val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

    val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
    val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

    val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))

    val columnsNameArray = schema.fieldNames

    val df1resultFinal = data.withColumn("DataPartition", get_cus_val(input_file_name))
    val rdd1 = sc.textFile(incrFileURL)
    val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
    val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
    val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)

    val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

    val columnMap = latestForEachKey.columns
      .filter(_.endsWith("_1"))
      .map(c => c -> c.dropRight(2))
      .toMap + ("FFAction_1" -> "FFAction|!|")


        val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
        val exprsExtended = exprs ++ Array(col("LineItem_organizationId"), col("LineItem_lineItemId"))
        println(exprsExtended)
        val df2 = data.select(exprsExtended: _*)//This line has compilation issue .

type mismatch; found : scala.collection.immutable.Iterable[org.apache.spark.sql.Column] required: Seq[?]

Also when i printed exprsExtended i am getting `` in my output columns

coalesce(LineItemSequence_1, LineItemSequence) AS `LineItemSequence`,

原文:https://stackoverflow.com/questions/48518189
更新时间:2022-02-17 07:02

最满意答案

不要聘请专职的PM。

如果你只有一个开发人员,聘请一个专门的项目经理似乎很愚蠢; 一个小公司中的每个人都必须填补多个角色,才能使公司良好运作。

对于你聘请的下一个开发人员来说,如果他们有能力领导一个开发团队,那将会非常有帮助; 听起来你还没有这种能力,你可能(或者可能不)有时间在飞行中学习。 如果你能找到一个能为你做到这一点的开发者,那就很有效率。

了解项目管理。

与此同时,我可能会购买一份麦康奈尔的“ 软件项目生存指南 ”,并给它一个阅读。 它解释了管理软件项目的基础知识,并且很容易阅读,并做得很好。

设定一个团队的最后期限。

为了设定最后期限,通常情况下,开发人员可以与经理合作设置每个人都知道并同意的合理期限。 如果他们是由开发人员设定的,则时间表将非常非常长; 如果它们完全由管理层决定,你会得到很多沮丧的开发人员。

确定需要完成的任务。

我立即开始编写高级需求(“我们需要一个更好的登录系统”),并将它们分为不同的级别:

  • 1 - 系统停机,有一个难得的机会,这需要在人们离开之前完成。
  • 2 - 这会给我们一个新的合同/这将使我们免于失去合同
  • 3 - 愿意拥有
  • 4 - 希望拥有
  • 5 - 目前不会去。

有了这份清单,您就可以更好地了解在哪里花时间,以及委派什么。 如果你委托了一些东西,它偶尔会不会像你预想的那样回归; 或者,您必须执行以下一项或多项操作才能使您的愿景保持正轨:

  • 和你一样在同一页面上雇佣开发人员,
  • 给出更详细的要求,
  • 在开始编码之前找到提出更多问题的开发人员,
  • 放弃部分愿景给团队的其他成员。

Do not hire a dedicated PM yet.

If you only have one developer, it seems silly to hire a dedicated project manager; everyone at a company that small must fill multiple roles for the company to work well.

For the next developer you hire, it would really help if they have the ability to lead a development team; it doesn't sound like you have that ability yet, and you may (or may not) have time to learn on the fly. If you can find a developer who can do this for you, that's efficient.

Learn about project management.

In the meanwhile, I might buy a copy of McConnell's "Software Project Survival Guide", and give it a read. It explains the basics of managing software projects, and does a good job of it while being easy to read.

Set deadlines as a team.

For setting deadlines, it usually works to have developers work with managers to set reasonable deadlines that everyone is aware of and agrees to. If they're set by the developers, the schedule will be very, very long; if they're set entirely by management, you get a lot of frustrated developers.

Prioritize what needs to be done.

Immediately, I'd start writing high-level requirements ("we need a better login system"), and prioritize them into various levels:

  • 1 - System is down, there's a showstopper, this needs to be done before people leave for the day.
  • 2 - This would get us a new contract/this would save us from losing a contract
  • 3 - Would love to have
  • 4 - Would like to have
  • 5 - Not going to get to at this time.

With that list, you can figure out better where to spend your time, and what to delegate. If you delegate something, it's occasionally going to come back not-as-you-envisioned; alternatively, you have to do one or more of the following to keep your vision on track:

  • hire developers more on the same page as you,
  • give more detailed requirements,
  • find developers who ask more questions before starting to code,
  • give up part of the vision to the rest of the team.

相关问答

更多
  • 为了使异步I / O工作,您必须具有连续内存。 在C中,您可以尝试重新分配一个数组,但在Java中,您必须分配新的内存。 您可以写入ByteArrayOutputStream ,然后在准备发送时将其转换为ByteBuffer 。 缺点是你正在复制内存,高效IO的关键之一是减少内存复制的次数。 In order for asynchronous I/O to work, you must have continuous memory. In C you can attempt to re-alloc an a ...
  • 切换到集合(就像你提到的ArrayList ),因为它会比创建一个新数组并复制每个插入的值(如果你使用数组必须要做的那样,因为如你所知,数组是静态大小的在创作时)。 ArrayList Javadoc说(部分), List接口的可调整大小的数组实现。 实现所有可选列表操作,并允许所有元素,包括null 。 除了实现List接口之外,此类还提供了一些方法来操作内部用于存储列表的数组的大小。 Switch to the collection (like your mentioned ArrayList), be ...
  • 该解决方案适合使用VerticalAlignment 。 您希望包含scrollview的网格行成为剩余空间的大小。 您不希望Scrollviewer最初占用所有空间,但需要将其限制在该空间内。 Stretch的默认VerticalAlignment将占据所有可用的大小。 使用Top代替它会导致它只与它需要的一样高,直到网格将它限制为可用空间为止。
  • 不要聘请专职的PM。 如果你只有一个开发人员,聘请一个专门的项目经理似乎很愚蠢; 一个小公司中的每个人都必须填补多个角色,才能使公司良好运作。 对于你聘请的下一个开发人员来说,如果他们有能力领导一个开发团队,那将会非常有帮助; 听起来你还没有这种能力,你可能(或者可能不)有时间在飞行中学习。 如果你能找到一个能为你做到这一点的开发者,那就很有效率。 了解项目管理。 与此同时,我可能会购买一份麦康奈尔的“ 软件项目生存指南 ”,并给它一个阅读。 它解释了管理软件项目的基础知识,并且很容易阅读,并做得很好。 设 ...
  • repeating_value = "k" #Assign the value which do you want to be repeated total_times=5 #how many times do you want expected_list=[repeating_value*i for i in range(1,total_times+1)] print(expected_list) repeating_value = "k" #Assign the value which do you ...
  • 对任何好奇的人来说都是好的更新: 一段时间(pweh)内存确实会被擦除,因为你将达到终端容量。 不确定这是不是理想但是嘿 - 我不打算卖给它数百万,如果应用程序崩溃,我会更新这篇文章。 不修复什么不破坏呃? 无论如何 - @Deev善意地说我应该转移到IntentService,但事实并非如此。 我可能在将来,但这种方法有效(尽管使用IntentService看起来确实简单)。 Okay update for anyone curious: The memory does get wiped after a ...
  • 我假定你的意思是你想让这个数字随着它的增长而显示出来? 这是一个不需要jQuery的解决方案: var start = 0 , max = 9000 , step = .01 , refreshRate = 50 // ms , number = start , el = document.getElementById('displayField') , growNumber = function() { number += step; ...
  • 最好的方法是从Xcode分析内存报告或通过仪器获取更详细的信息。 我建议你使用自动释放池(强制释放所有自动释放的对象): dispatch_async(dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0), { while (true) { autoreleasepool { self.myDataPipe.downloadMarketOdds(self.marketsToUpdate) ...
  • 如果您观看WWDC 2012 掌握自动布局的最佳实践 ,他们将引导您完成布局视图中的子视图和/或约束的动态构建(约45分钟进入视频)。 这侧重于添加适合的视图,但您也可以在场景中使用它。 我们的想法是,您的容器视图的UIView子类包含所有这些标签,然后覆盖layoutSubviews以适当地配置约束。 它有点多毛,但它有效: - (void)layoutSubviews { [super layoutSubviews]; // add any labels for my `toRecip ...
  • 将项添加到列表中 您可以使用: constructor创建一个包含除现有列表之外的项目的新列表。 ("key", "value") : existing existing是您已经制作的列表 跟踪变化的状态 您可以通过将状态从每个函数传递到下一个函数来跟踪函数之间的更改状态。 这是所有State monad正在做的事情。 State sa是依赖于(和改变)状态s的类型a的值。 {- ┌---- type of the state v v-- type of the va ...

相关文章

更多

最新问答

更多
  • 您如何使用git diff文件,并将其应用于同一存储库的副本的本地分支?(How do you take a git diff file, and apply it to a local branch that is a copy of the same repository?)
  • 将长浮点值剪切为2个小数点并复制到字符数组(Cut Long Float Value to 2 decimal points and copy to Character Array)
  • OctoberCMS侧边栏不呈现(OctoberCMS Sidebar not rendering)
  • 页面加载后对象是否有资格进行垃圾回收?(Are objects eligible for garbage collection after the page loads?)
  • codeigniter中的语言不能按预期工作(language in codeigniter doesn' t work as expected)
  • 在计算机拍照在哪里进入
  • 使用cin.get()从c ++中的输入流中丢弃不需要的字符(Using cin.get() to discard unwanted characters from the input stream in c++)
  • No for循环将在for循环中运行。(No for loop will run inside for loop. Testing for primes)
  • 单页应用程序:页面重新加载(Single Page Application: page reload)
  • 在循环中选择具有相似模式的列名称(Selecting Column Name With Similar Pattern in a Loop)
  • System.StackOverflow错误(System.StackOverflow error)
  • KnockoutJS未在嵌套模板上应用beforeRemove和afterAdd(KnockoutJS not applying beforeRemove and afterAdd on nested templates)
  • 散列包括方法和/或嵌套属性(Hash include methods and/or nested attributes)
  • android - 如何避免使用Samsung RFS文件系统延迟/冻结?(android - how to avoid lag/freezes with Samsung RFS filesystem?)
  • TensorFlow:基于索引列表创建新张量(TensorFlow: Create a new tensor based on list of indices)
  • 企业安全培训的各项内容
  • 错误:RPC失败;(error: RPC failed; curl transfer closed with outstanding read data remaining)
  • C#类名中允许哪些字符?(What characters are allowed in C# class name?)
  • NumPy:将int64值存储在np.array中并使用dtype float64并将其转换回整数是否安全?(NumPy: Is it safe to store an int64 value in an np.array with dtype float64 and later convert it back to integer?)
  • 注销后如何隐藏导航portlet?(How to hide navigation portlet after logout?)
  • 将多个行和可变行移动到列(moving multiple and variable rows to columns)
  • 提交表单时忽略基础href,而不使用Javascript(ignore base href when submitting form, without using Javascript)
  • 对setOnInfoWindowClickListener的意图(Intent on setOnInfoWindowClickListener)
  • Angular $资源不会改变方法(Angular $resource doesn't change method)
  • 在Angular 5中不是一个函数(is not a function in Angular 5)
  • 如何配置Composite C1以将.m和桌面作为同一站点提供服务(How to configure Composite C1 to serve .m and desktop as the same site)
  • 不适用:悬停在悬停时:在元素之前[复制](Don't apply :hover when hovering on :before element [duplicate])
  • 常见的python rpc和cli接口(Common python rpc and cli interface)
  • Mysql DB单个字段匹配多个其他字段(Mysql DB single field matching to multiple other fields)
  • 产品页面上的Magento Up出售对齐问题(Magento Up sell alignment issue on the products page)