问题:
实验1:
该系共有多少学生
tom的课程平均分
该系每名同学选修的课程门数
计算各个课程的平均成绩
计算DataBase课程成绩的众数
计算各课程的平均分,并基于此绘制柱状图
实验2:
生成班级与姓名的数据集class.csv
针对两个数据文件class.csv、chapter4-data01.txt,计算各个班级的成绩的平均分,中位数,众数,并绘制柱状图
实验前准备:
- 先把文件传到虚拟机上(可用FileZilla Client)
- chapter4-data01.txt文件,创建class.csv文件如下:
chapter4-data01.txt 和 class.txt
实验1代码:
import matplotlib.pyplot as plt
import os
from pyspark import SparkConf,SparkContext
#先配置java环境
os.environ["JAVA_HOME"]="/usr/local/SparkWorkSpace/jdk1.8.0_162"
os.environ["PYSPARK_PYTHON"]='/usr/bin/python3.6'
#初始化SparkConf时,或者提交Spark任务时,都会有master参数需要设置
#Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试
conf = SparkConf().setMaster("local").setAppName("repeat")
sc=SparkContext(conf=conf)
print(sc.version)#测试是否连接成功
#spark部分
filepath= r'file:///home/hadoop/downloads/chapter4-data01.txt'
rdd = sc.textFile(filepath)
rdd1 = rdd.map(lambda x:x.split(","))#单纯分
#rdd1.foreach(print)
rdd_name = rdd.map(lambda x:x.split(",")[0])#人名
rdd_score = rdd.map(lambda x:(x.split(",")[0],int(x.split(",")[2])))#人名和分、键值对形式
rdd_course = rdd.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1)))#(课程,(分,1))、键值对形式 #rdd_score.foreach(print)
rdd_ji = rdd1.map(lambda x: (x[0],1)) #人名,记数
#1.共有多少个学生
student_num = set(rdd_name.collect())
print('共有',len(student_num),'名学生')
#2、tom的课程平均分
mean_score=rdd_score.filter(lambda x : 'Tom' in x).reduceByKey(lambda a,b: a+b).map(lambda y: y[1]/5)
#print(type(mean_score))
print('Tom的课程平均分',mean_score.collect()[0])
#3、该系每名同学选修的课程门数
course = rdd_ji.reduceByKey(lambda a,b:a+b)
#print('课程平均分;')
print('每名同学的课程门数:')
#course.foreach(print)
#print(course.collect())
#4、计算各个课程的平均成绩
#all_mean_score = rdd_course.combineByKey(lambda income: (income,1),lambda acc,income: (acc[0]+income,acc[1]+1,lambda acc1,acc2: (acc1[0]+acc2[0],acc1[1]+acc2[1]))).map(lambda x:(x[0],x[1][0]/float(x[1][1])))
all_mean_score = rdd_course.reduceByKey(lambda a,b:(a[0]+b[0],a[1]+b[1])).map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
print('各课程的平均成绩:')
all_mean_score.foreach(print)
#5、计算DataBase课程成绩的众数
zongshu = rdd1.filter(lambda x:x[1]=='DataBase').map(lambda x:(x[2],1)).reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],False).first()
#zongshu.foreach(print)
print('众数:',int(zongshu[0]))
#6. 计算各课程的平均分,并基于此绘制柱状图
x_data=[]#横坐标数据
y_data=[]#竖坐标数据
for ii in range(len(all_mean_score.collect())):
x_data.append(all_mean_score.collect()[ii][0])
y_data.append(all_mean_score.collect()[ii][1])
print(x_data)#查看是否单列出来
print(y_data)
plt.bar(x=x_data,height=y_data,color='steelblue',alpha=0.8)#作柱状图
plt.xticks(rotation='vertical')#因为横坐标文字重复,将其设置为竖排
plt.xlabel('Class')
plt.ylabel('Mean_score')
plt.title('Class_Meanscore')
plt.show()
实验2代码:
import pandas as pd
import random
import matplotlib.pyplot as plt
import os
from pyspark import SparkConf,SparkContext
#先配置java环境
os.environ["JAVA_HOME"]="/usr/local/SparkWorkSpace/jdk1.8.0_162"
os.environ["PYSPARK_PYTHON"]='/usr/bin/python3.6'
#初始化SparkConf时,或者提交Spark任务时,都会有master参数需要设置
#Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试
conf = SparkConf().setMaster("local").setAppName("repeat_实验2")
sc=SparkContext(conf=conf)
print(sc.version)#测试是否连接成功
#连接数据表class.csv
filepath= r'file:///home/hadoop/downloads/class.csv'
rdd = sc.textFile(filepath)
filepath_data= r'file:///home/hadoop/downloads/chapter4-data01.txt'
rdd_data = sc.textFile(filepath_data)
#对class表操作
rdd1 = rdd.map(lambda x:x.split(",")).map(lambda x:(x[0],x[1]))#单纯分
#rdd1.foreach(print)#查看转成rdd后的样子,每个元素是【】列表
#对data表操作
rdd1_data = rdd_data.map(lambda x:x.split(",")).map(lambda x:(x[0],(x[1],x[2])))#先以逗号分开,再制成元组
#rdd1_data.foreach(print)
#对data表重新操作,只保留(名字,成绩)
rdd1_data_re = rdd_data.map(lambda x:x.split(",")).map(lambda x: (x[0],int(x[2])))
#rdd1_data_re.foreach(print)
#要进行班级的平均分之类的运算,考虑连接两个表
rdd_join = rdd1.join(rdd1_data_re)
#rdd_join.foreach(print)
#只取班级和成绩
rdd_class = rdd_join.map(lambda x:x[1])
#rdd_class.foreach(print)
#算总分:
rdd_sum = rdd_class.reduceByKey(lambda a,b:a+b).sortByKey(True)
#rdd_sum.foreach(print)
#算总分,但用的groupByKey(),(x,(1,1,2,3))
rdd_sum_other = rdd_class.groupByKey().map(lambda x:(x[0],x[1])).sortByKey(True)
#rdd_sum_other.foreach(print)
#分数项数:
leng = rdd_sum_other.map(lambda x:len(x[1]))
#print(leng.collect())
#平均分:
mean = rdd_sum_other.map(lambda x:(x[0],x[1])).map(lambda x : (x[0],round(sum(x[1])/len(x[1]),2)))
#print(mean.collect())
#print('各班级平均分:')
#mean.foreach(print)
#中位数:
mid = rdd_sum_other.map(lambda x:(x[0],sorted(x[1]))).map(lambda x:(x[0],x[1][len(x[1])//2]))
#print('各班级中位数')
#mid.foreach(print)
#print(mid.collect())
#众数:
zong = rdd_class.map(lambda x: ((x[0],x[1]),1)).reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],False)
class1=zong.filter(lambda x:x[0][0]=='class01').map(lambda x:(x[0][0],x[0][1])).first()
class2=zong.filter(lambda x:x[0][0]=='class02').map(lambda x:(x[0][0],x[0][1])).first()
class3=zong.filter(lambda x:x[0][0]=='class03').map(lambda x:(x[0][0],x[0][1])).first()
class4=zong.filter(lambda x:x[0][0]=='class04').map(lambda x:(x[0][0],x[0][1])).first()
class5=zong.filter(lambda x:x[0][0]=='class05').map(lambda x:(x[0][0],x[0][1])).first()
class6=zong.filter(lambda x:x[0][0]=='class06').map(lambda x:(x[0][0],x[0][1])).first()
class7=zong.filter(lambda x:x[0][0]=='class07').map(lambda x:(x[0][0],x[0][1])).first()
class8=zong.filter(lambda x:x[0][0]=='class08').map(lambda x:(x[0][0],x[0][1])).first()
class9=zong.filter(lambda x:x[0][0]=='class09').map(lambda x:(x[0][0],x[0][1])).first()
class10=zong.filter(lambda x:x[0][0]=='class010').map(lambda x:(x[0][0],x[0][1])).first()
class11=zong.filter(lambda x:x[0][0]=='class011').map(lambda x:(x[0][0],x[0][1])).first()
#zong.foreach(print)
#print('各班级众数:')
zong_list = [class1,class2,class3,class4,class5,class6,class7,class8,class9,class10,class11]
#print(zong_list)
#画柱状图(平均分):
mean_x=[]#横坐标数据
mean_y=[]#竖坐标数据
for ii in range(len(mean.collect())):
mean_x.append(mean.collect()[ii][0])
mean_y.append(mean.collect()[ii][1])
#print(mean_x)#查看是否单列出来
#print(mean_y)
plt.bar(x=mean_x,height=mean_y,color='steelblue',alpha=0.8)#作柱状图
plt.xticks(rotation='vertical')#因为横坐标文字重复,将其设置为竖排
plt.xlabel('Class')
plt.ylabel('Mean_score')
plt.title('Class_Meanscore')
plt.show()
#画柱状图(中位数):
mid_x=[]#横坐标数据
mid_y=[]#竖坐标数据
for jj in range(len(mid.collect())):
mid_x.append(mid.collect()[jj][0])
mid_y.append(mid.collect()[jj][1])
#print('mid',mid_x)#查看是否单列出来
#print(mid_y)
plt.bar(x=mid_x,height=mid_y,color='steelblue',alpha=0.8)#作柱状图
plt.xticks(rotation='vertical')#因为横坐标文字重复,将其设置为竖排
plt.xlabel('Class')
plt.ylabel('Mid_score')
plt.title('Class_Mid')
plt.show()