apache-spark - library - pyspark sql functions json



Múltiplas operações agregadas na mesma coluna de um dataframe de centelha (2)

Eu tenho três matrizes do tipo string contendo as seguintes informações:

  • array groupBy: contendo nomes das colunas pelas quais quero agrupar meus dados.
  • matriz agregada: contendo nomes de colunas que quero agregar.
  • matriz de operações: contendo as operações agregadas que quero executar

Eu estou tentando usar quadros de dados de faísca para conseguir isso. Os quadros de dados do Spark fornecem um agg () no qual você pode transmitir um Map [String, String] (do nome da coluna e respectiva operação agregada) como entrada, no entanto, quero executar diferentes operações de agregação na mesma coluna dos dados. Alguma sugestão sobre como conseguir isso?


Para aqueles que se perguntam, como @ resposta zero323 pode ser escrita sem uma compreensão de lista em python:

from pyspark.sql.functions import min, max, col
# init your spark dataframe

expr = [min(col("valueName")),max(col("valueName"))]
df.groupBy("keyName").agg(*expr)

Scala :

Você pode, por exemplo, mapear uma lista de funções com um mapping definido de nome para função:

import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
  "min" -> min, "max" -> max, "mean" -> avg)

val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// |  k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// |  1|   3.0|   3.0|   3.0|
// |  2|  -5.0|  -5.0|  -5.0|
// +---+------+------+------+

ou

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show

Infelizmente analisador que é usado internamente SQLContext não é exposto publicamente, mas você pode sempre tentar construir consultas SQL simples:

df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
  f => s"$f($c) AS ${c}_${f}")
).mkString(",")

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")

Python :

from pyspark.sql.functions import mean, sum, max, col

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)

Veja também:

  • Spark SQL: aplica funções agregadas a uma lista de colunas




apache-spark-sql