pyspark读取数据

方法一:从hdfs读取

# -*- coding: utf-8 -*

from pyspark.sql import SparkSession, HiveContext,DataFrameWriter

import argparse

import time

import numpy as np

import pandas as pd

spark = SparkSession.builder.enableHiveSupport().appName("test").getOrCreate()

start = time.time()

### 数据载入方法1: hdfs上载入parquent格式

input = "/aaa/bbb/ccc"

data = spark.read.parquet(input)

data.show(5)

+-------------------+------+--------------------+

| START_TIME|amount| payerCode|

+-------------------+------+--------------------+

|2019-06-28 21:04:37| 10.7|692200000XXXXXXX|

|2018-11-24 20:15:40| 19.9|602200000XXXXXXX|

|2019-06-19 12:33:14| 2.0|692200000XXXXXXX|

|2019-07-03 23:04:12| 5.27|622200000XXXXXXX|

|2018-11-26 21:26:30| 2.0|622200000XXXXXXX|

+-------------------+------+--------------------+

## pyspark读取数据方法二:从hive中读取

方法二:数据从数据库读取

####### 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句

hive_context= HiveContext(spark)

hive_read = "select * from {}.{}".format(hive_database, hive_table2)

####### 通过SQL语句在hive中查询的数据直接是dataframe的形式

read_df = hive_context.sql(hive_read)

read_df.show(5)

+-------------------+------+--------------------+

| START_TIME|amount| payerCode|

+-------------------+------+--------------------+

|2019-06-28 21:04:37| 10.7|692200000XXXXXXX|

|2018-11-24 20:15:40| 19.9|602200000XXXXXXX|

|2019-06-19 12:33:14| 2.0|692200000XXXXXXX|

|2019-07-03 23:04:12| 5.27|622200000XXXXXXX|

|2018-11-26 21:26:30| 2.0|622200000XXXXXXX|

+-------------------+------+--------------------+

方法3:读取hdfs上的csv文件

tttt = spark.read.csv(filepath,header=’true’,inferSchema=’true’,sep=’,’)

pyspark数据存储

方法1: 以parquent格式存储到hdfs

data1.write.mode(SaveMode.Overwrite).parquet(output)

方法2:以Table的格式存入hive数据库

##### 数据存入数据库

hive_database = "testt0618"

data1 = data.limit(10)

1: 用saveAsTable()方法存入hive数据库

hive_table1 = "ii"

data1.write.format("hive").mode("overwrite").saveAsTable('{}.{}'.format(hive_database, hive_table1))

2:利用sql语句存入hive数据库

hive_table2 = "lll"

data1.registerTempTable('test_hive')

sqlContext.sql("create table {}.{} select * from test_hive".format(hive_database, hive_table2))

方法3:以csv格式存储到hdfs

output = “/aaa/bbb/ccc”

data1.coalesce(1).write.option("sep", "#").option("header", "true").csv(output + "_text",mode='overwrite')

参考相关:

www.zzvips.com/article/73466.html

https://zhuanlan.zhihu.com/p/34901558

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐