스파크 데이터 프레임에 새 열을 추가하려면 어떻게 해야 합니까(PySpark 사용)?
Spark DataFrame(PySpark 1.5.1 사용)이 있는데 새 열을 추가하려고 합니다.
저는 다음을 시도했지만 성공하지 못했습니다.
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
또한 다음을 사용하여 오류가 발생했습니다.
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
PySpark를 사용하여 기존 DataFrame에 새 열(Python 벡터 기반)을 추가하려면 어떻게 해야 합니까?
임의 열을 다음에 추가할 수 없습니다.DataFrame
스파크에서.리터럴을 사용해야만 새 열을 만들 수 있습니다(다른 리터럴 유형은 스파크 데이터 프레임에 상수 열을 추가하는 방법에 설명되어 있습니다).
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
기존 열 변환:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
사용하여 포함join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
또는 함수 / udf로 생성됨:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
성능 측면에서 기본 제공되는 기능)pyspark.sql.functions
Catalyst 식에 매핑되는 )는 일반적으로 Python 사용자 정의 함수보다 선호됩니다.
임의 RDD의 내용을 열로 추가하려면 다음을 수행합니다.
- 기존 데이터 프레임에 행 번호 추가
- 불러
zipWithIndex
RDD에서 데이터 프레임으로 변환합니다. - 인덱스를 조인 키로 사용하여 둘 다 조인
UDF를 사용하여 열을 추가하는 방법
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
elif value == 2: return 'cat2'
...
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))
pySpark에 새 열을 추가할 수 있는 방법은 여러 가지가 있습니다.
먼저 간단한 데이터 프레임을 생성합니다.
date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())
이제 열 값을 두 배로 늘려서 새 열에 저장해 보겠습니다. PFB는 몇 가지 다른 접근 방식을 사용하여 동일한 값을 달성합니다.
# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()
# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()
# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()
# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()
스파크 데이터 프레임 기능에 대한 더 많은 예시와 설명은 제 블로그를 방문하시면 됩니다.
이것이 도움이 되길 바랍니다.
기존 열을 기준으로 채울 일부 사용자 지정 값 또는 동적 값 계산을 사용하여 새 열을 추가합니다.
예.
|ColumnA | ColumnB |
|--------|---------|
| 10 | 15 |
| 10 | 20 |
| 10 | 30 |
C열을 A열+B열로 변경
|ColumnA | ColumnB | ColumnC|
|--------|---------|--------|
| 10 | 15 | 25 |
| 10 | 20 | 30 |
| 10 | 30 | 40 |
사용.
#to add new column
def customColumnVal(row):
rd=row.asDict()
rd["ColumnC"]=row["ColumnA"] + row["ColumnB"]
new_row=Row(**rd)
return new_row
#convert DF to RDD
df_rdd= input_dataframe.rdd
#apply new fucntion to rdd
output_dataframe=df_rdd.map(customColumnVal).toDF()
input_dataframe
수정될 데이터 프레임입니다.customColumnVal
함수에 새 열을 추가할 코드가 있습니다.
아래 단계를 통해 DataFrame에 열을 직접 추가할 수 있습니다.
from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
새 항목을 정의할 수 있습니다.udf
를 추가할 때column_name
:
u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn('new_col', func_name(df.old_col))
저는 매우 유사한 사용 사례에 대한 일반적인 예를 제시하고자 합니다.
사용 사례:다음으로 구성된 CSV가 있습니다.
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
몇 가지 변환을 수행해야 하며 최종 CSV는 다음과 같이 표시되어야 합니다.
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
이 작업은 일부 모델에서 정의한 스키마이므로 SQL Bulk Insert와 같은 최종 데이터를 상호 운용할 수 있어야 합니다.
그래서:
저는 spark.read를 사용하여 원본 csv를 읽고 "df"라고 부릅니다.
저는 데이터에 뭔가를 합니다.
다음 스크립트를 사용하여 null 열을 추가합니다.
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
df = df.select(outcols)
이러한 방식으로 CSV를 로드한 후 스키마를 구성할 수 있습니다(여러 테이블에 대해 이 작업을 수행해야 하는 경우 열 순서를 다시 지정할 수도 있습니다).
열을 추가하는 가장 간단한 방법은 "열 포함"을 사용하는 것입니다.데이터 프레임은 sqlContext를 사용하여 생성되므로 스키마를 지정해야 합니다. 그렇지 않으면 기본적으로 데이터 집합에서 사용할 수 있습니다.스키마를 지정하면 매번 변경할 때마다 워크로드가 지루해집니다.
다음은 고려할 수 있는 예입니다.
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default
# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")
# Check the change
Data.printSchema()
pyspark 3.2 이상에서 다음을 사용할 수 있습니다.
my_df_spark.pandas_api().assign(hours=spark_new_col.pandas_api()['new_col']).to_spark().show()
언급URL : https://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark
'programing' 카테고리의 다른 글
MongoDB에서 ObjectId와 해당 문자열 형식의 저장 차이 (0) | 2023.06.24 |
---|---|
Oracle의 ALL_TAB_COLUMNS 테이블에 있는 BIN$... 테이블은 무엇입니까? (0) | 2023.06.24 |
'id'는 Python에서 잘못된 변수 이름입니다. (0) | 2023.06.24 |
Mongodb 데이터베이스 노드 모의/테스트.js (0) | 2023.06.24 |
각 열 셀에 대한 열 셀 확장 (0) | 2023.06.24 |