Matheus Schmitz
LinkedIn
Github Portfolio
import sys
from pyspark import SparkContext, SparkConf
from operator import add
import time
import math
# Track time taken
start_time = time.time()
import findspark
findspark.init()
sc = SparkContext.getOrCreate(SparkConf().set("spark.executor.memory", "4g").set("spark.driver.memory", "4g"))
sc.setLogLevel('ERROR')
threshold_filter = 25
support = 60
input_file_path = "ta_feng_all_months_merged.csv"
output_file_path = "market_basket.csv"
# Read the CSV skipping its header
csvRDD = sc.textFile(input_file_path, round(math.sqrt(support)))
#csvRDD = sc.textFile(input_file_path, min(support//2, 8))
csvHeader = csvRDD.first()
csvRDD = csvRDD.filter(lambda row: row != csvHeader) \
.map(lambda row: row.split(',')) \
.map(lambda row: (row[0][1:-1]+'-'+row[1][1:-1], str(int(row[5][1:-1]))))
# Convert the input data to baskets
bskts = csvRDD.groupByKey() \
.map(lambda row: list(row[1])) \
.filter(lambda row: len(row) > threshold_filter)
# Make the number of partitions and the support threshold available to all computing nodes
n_part= sc.broadcast(float(bskts.getNumPartitions()))
support = sc.broadcast(float(support))
def apriori(partition):
# Start with singletons, aka itemset size of one, aka k=1
k_size = 1
# Get all partition-frequent singles so that apriori can run its iterations
freq_items, baskets = frequentSingles(partition)
# Output frequent sets of size 1 (aka frequent singles)
yield k_size, freq_items
# Reshape the singles to (-1, 1) so that they are standardized to the shape of future itemsets
freq_items = [set([single]) for single in freq_items]
# Loop over an increasing k value until no frequent itemsets are found in this partition
while k_size >= 1:
k_size += 1
# Generate candidate itemsets for this partition
candidate_itemsets = {}
for a in freq_items:
for b in freq_items:
if len(set(a).union(set(b))) == k_size:
candidate_itemsets.update({tuple(sorted(set(a).union(set(b)))): 0})
# Loop through candidate_itemsets if any were generated
if bool(candidate_itemsets):
# Check each candidate pair in each basket
for itemset in candidate_itemsets.keys():
for basket in baskets:
# Count number of baskets that have all itemset elements as frequent items
if all(True if item in basket else False for item in itemset):
candidate_itemsets[itemset] += 1
# Keep only the items which pass the weighted threshold
freq_items = [itemset for itemset, count in candidate_itemsets.items() if count >= support.value/n_part.value]
# If there are k-sized itemsets yield them and move to the next k
if bool(freq_items):
yield k_size, freq_items
# If no frequent itemset for this value of k was found in this partition, then stop
else:
break
# If no candidate sets were generated in this iteration, then stop
else:
break
def frequentSingles(partition):
singleton_counts, baskets = {}, []
# Count item occurances in each basket in this partition
for list_of_values_grouped_by_key in partition:
# Also append the basket to a list contining all baskets
baskets.append(list_of_values_grouped_by_key)
for i in list_of_values_grouped_by_key:
if singleton_counts.get(i) == None:
singleton_counts[i] = 0
singleton_counts[i] += 1
# Filter occurances to keep only frequent singletons (considering a weighted threshold based on the number of partitions)
freq_items = [item for item, count in singleton_counts.items() if count >= support.value/n_part.value]
freq_items.sort()
return freq_items, baskets
# Apply A-Priori to each partition to find the candidate itemsets
candidatesRDD = bskts.mapPartitions(lambda partition: apriori(partition)) \
.reduceByKey(lambda x, y: sorted(set(x + y))) \
.sortBy(lambda candidate: candidate[0]) \
.collect()
# Broadcast candidate itemsets to all nodes
candidate_itemsets = sc.broadcast(candidatesRDD)
# Write the candidate itemsets to the output file
with open(output_file_path, 'w') as fout:
# Write the header for the candidates step
fout.write('Candidates:')
# Use a special output for the singletons, since they are not stored as tuples by python
fout.write('\n' + ','.join([f"('{candidate}')" for candidate in candidate_itemsets.value[0][1]]))
# For all other k-sized itemsets simply output them
for candidates in candidate_itemsets.value[1:]:
fout.write('\n\n' + ','.join([f"{candidate}" for candidate in candidates[1]]))
# Visualize the candidate itemsets found
candidatesRDD
[(1, ['20246013', '20332433', '20412074', '20415723', '20505233', '20535407', '20535414', '20546236', '20557003', '2250062000090', '2250078000251', '2250271000034', '2250271000218', '25700010326', '28400015547', '28400017305', '29000070295', '29000070301', '37000304593', '37000329169', '37000329206', '37000337270', '37000440147', '37000440192', '37000441809', '37000442127', '37000445111', '3960097006002', '40000015314', '40000015321', '40000757320', '4006670360150', '4014400901573', '4014612509215', '41186001559', '41419761748', '41736007284', '4710008111146', '4710008241140', '4710008241218', '4710008251125', '4710008290025', '4710008290056', '4710008290155', '4710010010017', '4710011401128', '4710011401135', '4710011401142', '4710011402019', '4710011402026', '4710011402194', '4710011405133', '4710011406123', '4710011409056', '4710011432825', '4710011432856', '4710012122121', '4710012131130', '4710012242140', '4710015103370', '4710015202721', '4710015202912', '4710018004605', '4710018004704', '4710018008634', '4710018008733', '4710018031632', '4710022101208', '4710022102304', '4710022201496', '4710022237501', '4710022275503', '47100312', '4710032501791', '4710032604751', '4710032604812', '4710035310055', '4710035315517', '4710035369510', '4710036003581', '4710036003598', '4710036007039', '4710036009071', '4710036012019', '4710036024036', '4710036038033', '4710043004564', '4710043100082', '4710043355437', '4710043552065', '4710043552102', '4710043981513', '4710046011101', '4710046011156', '4710046021100', '4710046021155', '4710047502011', '4710047502066', '4710054132201', '4710054134403', '4710054135202', '4710054380619', '4710056516832', '4710063021091', '4710063121036', '4710063121661', '4710063312168', '4710084212010', '4710084225676', '4710084225829', '4710085104116', '4710085104130', '4710085120093', '4710085120475', '4710085120628', '4710085120680', '4710085120697', '4710085120703', '4710085120710', '4710085121007', '4710085172696', '4710085172702', '4710088410139', '4710088410207', '4710088410382', '4710088410443', '4710088410610', '4710088410795', '4710088411761', '4710088411785', '4710088412201', '4710088412218', '4710088414021', '4710088414052', '4710088414113', '4710088414311', '4710088414328', '4710088424655', '4710088424778', '4710088431295', '4710088431301', '4710088620156', '4710094004711', '4710094009273', '4710094014383', '4710094014406', '4710094014741', '4710094014765', '4710094014789', '4710094020063', '4710094020179', '4710094020193', '4710094021466', '4710094021565', '4710094034039', '4710094097768', '4710094699078', '4710095958006', '4710095987402', '4710098156133', '4710104106107', '4710104111569', '4710104112924', '4710105002019', '4710105002026', '4710105010137', '4710105010182', '4710105015118', '4710105015125', '4710105015514', '4710105015521', '4710105020310', '4710105046013', '4710105080420', '4710109770396', '4710110510523', '4710110513265', '4710114105046', '4710114128038', '4710114128618', '4710114308119', '4710114322115', '4710114361114', '4710114362029', '4710114604112', '4710114606048', '4710114730002', '4710120001653', '4710120560129', '4710126020467', '4710126040663', '4710126185012', '4710126190535', '4710126191259', '4710126194281', '4710128030020', '4710128030037', '4710128420203', '4710134023214', '4710134023276', '4710140300453', '4710154012144', '4710154015206', '4710154620264', '4710160001033', '4710160013074', '4710167111018', '4710168102206', '4710168102237', '4710168702079', '4710168705056', '4710171021327', '4710171120228', '4710172030014', '4710174011790', '4710174018195', '4710174041551', '4710174053691', '4710175567128', '4710176011040', '4710176011194', '4710176023098', '4710176121145', '4710186051098', '4710186063145', '4710186064364', '4710186185021', '4710189810333', '4710199010174', '4710247005206', '4710247005299', '4710247005831', '4710247006128', '4710247006562', '4710247007613', '4710249001992', '4710249002005', '4710249002012', '4710249002029', '4710254011627', '4710254015021', '4710254049323', '4710254049521', '4710265847666', '4710265849066', '4710291101039', '4710291112172', '4710291138134', '4710303267814', '4710303280172', '4710303340234', '4710304111147', '4710311010211', '4710311012413', '4710311703014', '4710311811511', '4710320224265', '4710321790509', '4710321861186', '4710323168016', '4710323168054', '4710323168078', '4710363342001', '4710363352000', '4710363502009', '4710363502016', '4710363522007', '4710363522014', '4710363553025', '4710363604000', '4710363609005', '4710367807421', '4710367814337', '4710421019081', '4710421090059', '4710452110115', '4710466101130', '4710466102052', '4710466103073', '4710466103080', '4710467221196', '4710467221516', '4710492321038', '4710494050110', '4710498235148', '4710498235674', '4710530033039', '4710543214012', '4710543215040', '4710543310059', '4710543310066', '4710549000039', '4710579101003', '4710583110015', '4710583300089', '4710583920003', '4710583996008', '4710585930154', '4710585930673', '4710603011018', '4710603013425', '4710626110156', '4710626110613', '4710626111252', '4710626111351', '4710626111450', '4710626210955', '4710626622857', '4710626844358', '4710661202960', '4710670200100', '4710670200407', '47106710', '4710683100015', '4710685440362', '4710685443820', '4710685443837', '4710731050101', '4710731060124', '4710734000011', '4710740600090', '4710740600106', '4710762101025', '4710762101032', '4710762101063', '4710823997208', '4710823997239', '4710852001013', '4710857000028', '4710857000059', '4710857472634', '4710868501019', '4710871000165', '4710892632017', '4710908111116', '4710908120019', '4710908131534', '4710908131589', '4710908131824', '4710943100328', '4710943100335', '4710943100410', '4710943101318', '4710960911082', '4710988000553', '4710998001205', '4711001302104', '4711001917018', '4711022100017', '4711080010112', '4711080040195', '4711128778813', '4711128778875', '4711128778882', '4711258001256', '4711258002505', '4711258004004', '4711258004110', '4711258004509', '4711258007371', '4711271000014', '4711271000090', '4711271000472', '4711300986999', '4711390437081', '4711413000025', '4711437000018', '4711524000891', '4711524000907', '4711524001041', '4711572001178', '4711587808946', '4711587808953', '4711634002297', '4711634002570', '4711634002587', '4711663700010', '4711713390031', '4711856000125', '4711856020215', '4711863180070', '4711985100017', '4711985100024', '4712019100591', '4712019100607', '4712031000022', '4712054701371', '4712054702255', '4712162000038', '4712172010010', '4712425010255', '4712425010446', '4712425010569', '4712425010712', '4712453001072', '4712598000015', '4712697490816', '4712934010074', '4712987106649', '4713080626225', '4713089003102', '4713754987607', '4713754987614', '4713985863121', '4714058833195', '4714058833218', '4714082270041', '4714108700040', '4714242832669', '4714381003128', '4714390001023', '4714981010038', '4715874000150', '4715874000518', '4715874000600', '4715874000662', '4716701000060', '4717101101265', '4718433613228', '4718433613310', '4719090105002', '4719090105019', '4719090900058', '4719090900065', '4719111313102', '4719581980309', '4719854271752', '4719859015061', '4901422038939', '4901550152484', '4901550332077', '4901872810451', '4902430001908', '4902430041232', '4902430041270', '4902738431155', '4902757148805', '4903111146055', '4903111345717', '4973540001256', '50000301829', '51000036872', '719859796117', '7231254801515', '723125485025', '723125485032', '723125488019', '7231254880206', '723125488026', '723125488033', '723125488064', '74261160506', '76150215281', '78895100020', '8801019931536', '8801266122145', '8850332247152', '8851111202645', '8851111202652', '8888021100624', '8888021200256', '8888420881162', '9300644131711', '9300644131759', '9300644131766', '9300644131773', '9310042238981', '9310042571491', '9310052751203', '9310052751227', '9414863071854']), (2, [('4710011401128', '4710011401135'), ('4710011401128', '4710011401142'), ('4710011401128', '4710011405133'), ('4710011401128', '4710011406123'), ('4710011401128', '4710011409056'), ('4710011401135', '4710011405133'), ('4710011401135', '4710011406123'), ('4710011401135', '4710011409056'), ('4710011401142', '4710011405133'), ('4710011401142', '4710011406123'), ('4710011405133', '4710011406123'), ('4710011405133', '4710011409056'), ('4710011406123', '4710011409056'), ('4710018004605', '4710018004704'), ('4710018004605', '4710088410139'), ('4710018008634', '4710018008733'), ('4710018008634', '4710154620264'), ('4710043552102', '4711271000014'), ('4710085120093', '4710085120628'), ('4710085120093', '4710085172696'), ('4710085120093', '4710085172702'), ('4710085120628', '4710085172696'), ('4710085120628', '4710085172702'), ('4710085120680', '4710085120697'), ('4710085120703', '4710085120710'), ('4710085172696', '4710085172702'), ('4710088410139', '4710088410207'), ('4710088414311', '4710088414328'), ('4710114105046', '4711271000014'), ('4710114128038', '4710114606048'), ('4710114128038', '4711271000014'), ('4710114606048', '4711271000014'), ('4710254049323', '4710254049521'), ('4710908131589', '4711271000014'), ('4711271000014', '4714981010038'), ('4711524000891', '4711524000907'), ('4711524000891', '4711524001041'), ('4711524000907', '4711524001041'), ('4713754987614', '7231254880206'), ('4714981010038', '4719090900065'), ('723125485032', '7231254880206')]), (3, [('4710011401128', '4710011401135', '4710011405133'), ('4710011401128', '4710011401135', '4710011406123'), ('4710011401128', '4710011401135', '4710011409056'), ('4710011401128', '4710011405133', '4710011406123'), ('4710011401128', '4710011405133', '4710011409056'), ('4710011401128', '4710011406123', '4710011409056'), ('4710011401135', '4710011405133', '4710011406123'), ('4710011401135', '4710011405133', '4710011409056'), ('4710011401135', '4710011406123', '4710011409056'), ('4710011401142', '4710011405133', '4710011406123'), ('4710011405133', '4710011406123', '4710011409056'), ('4710085120093', '4710085120628', '4710085172696'), ('4710085120628', '4710085172696', '4710085172702'), ('4711524000891', '4711524000907', '4711524001041')]), (4, [('4710011401128', '4710011401135', '4710011405133', '4710011409056')])]
def formattedCandidateCounts(partition):
# Get a dicitionary with the count of all singletons in this partition
conts_dict = countCandidates(partition)
# Single item candidates become strings instead of tuple, fix that
for candidate, count in conts_dict.items():
candidate_as_tuple = candidate if type(candidate) == tuple else tuple([candidate])
yield candidate_as_tuple, count
def countCandidates(partition):
# Count the number of candidate occurances in each partition (which is a set of baskets)
counts = {}
# Loop though each basket in the partition
for list_iterator in partition:
# Loop through each set of cancidates from apriori (which are indexed by itemset size)
for candidate in candidate_itemsets.value:
# For each candidate itemset of a given size, check if all its sub-elements are in a given basket, if yes, then add 1 to the count
for itemset in candidate[1]:
# Coerce singletorns to tutple type
itemset_as_tuple = itemset if type(itemset) == tuple else tuple([itemset])
# If all items in the itemset are frequent in a given basket, increase the itemset's counter by 1
if all(True if item in list_iterator else False for item in itemset_as_tuple):
if counts.get(itemset) == None:
counts[itemset] = 0
counts[itemset] += 1
return counts
# Count candidates to see which are truly frequent itemsets
freqItemsetsRDD = bskts.mapPartitions(lambda partition: formattedCandidateCounts(partition)) \
.reduceByKey(add) \
.filter(lambda candidate: candidate[1] >= support.value) \
.keys().sortBy(lambda itemset: (len(itemset), itemset)) \
.collect()
# Organize the itemsets by size, for outputting
output = dict()
for itemset in freqItemsetsRDD:
itemset_size = len(itemset)
if output.get(itemset_size) == None:
output[itemset_size] = []
output[itemset_size].append(itemset)
# Open the output file in append mode and add the frequent itemsets
with open(output_file_path, 'a') as fout:
# Add a header for Frequent Itemsets
fout.write('\n\n' + 'Frequent Itemsets:')
# Use a special output for the singletons, since they are not stored as tuples by python
fout.write('\n' + ','.join([f"('{itemset[0]}')" for itemset in output[1]]))
# For all other k-sized itemsets simply output them
for itemset_size in list(output.keys())[1:]:
fout.write('\n\n' + ','.join([f"{itemset}" for itemset in output[itemset_size]]))
# Visualize the frequent itemsets found
output
{1: [('20412074',), ('20557003',), ('37000329169',), ('37000329206',), ('37000337270',), ('37000440147',), ('37000442127',), ('37000445111',), ('4014400901573',), ('4710011401128',), ('4710011401135',), ('4710011401142',), ('4710011402019',), ('4710011405133',), ('4710011406123',), ('4710011409056',), ('4710011432825',), ('4710012122121',), ('4710012131130',), ('4710015103370',), ('4710015202721',), ('4710018004605',), ('4710018004704',), ('4710018008634',), ('4710022201496',), ('4710022237501',), ('4710022275503',), ('4710032501791',), ('4710035369510',), ('4710036003581',), ('4710036007039',), ('4710036009071',), ('4710036012019',), ('4710043552102',), ('4710046021100',), ('4710054134403',), ('4710054380619',), ('4710063312168',), ('4710084225676',), ('4710085104116',), ('4710085104130',), ('4710085120093',), ('4710085120628',), ('4710085120680',), ('4710085120703',), ('4710085120710',), ('4710085172696',), ('4710085172702',), ('4710088410139',), ('4710088410207',), ('4710088410382',), ('4710088410443',), ('4710088410610',), ('4710088414311',), ('4710088414328',), ('4710088620156',), ('4710094004711',), ('4710094014765',), ('4710094021466',), ('4710094097768',), ('4710094699078',), ('4710104106107',), ('4710104111569',), ('4710104112924',), ('4710105002019',), ('4710105002026',), ('4710114105046',), ('4710114128038',), ('4710114322115',), ('4710114362029',), ('4710114606048',), ('4710128030037',), ('4710134023214',), ('4710154012144',), ('4710154015206',), ('4710154620264',), ('4710160001033',), ('4710168705056',), ('4710174041551',), ('4710174053691',), ('4710247005299',), ('4710247006562',), ('4710254049323',), ('4710254049521',), ('4710265849066',), ('4710291112172',), ('4710311703014',), ('4710321790509',), ('4710323168054',), ('4710363352000',), ('4710363522007',), ('4710421090059',), ('4710452110115',), ('4710466101130',), ('4710466103080',), ('4710467221196',), ('4710494050110',), ('4710498235148',), ('4710543214012',), ('4710543215040',), ('4710543310059',), ('4710543310066',), ('4710579101003',), ('4710583110015',), ('4710583996008',), ('4710603013425',), ('4710626111351',), ('4710626111450',), ('4710626622857',), ('47106710',), ('4710685440362',), ('4710731050101',), ('4710740600090',), ('4710857000059',), ('4710892632017',), ('4710908111116',), ('4710908120019',), ('4710908131534',), ('4710908131589',), ('4710908131824',), ('4710943101318',), ('4711001302104',), ('4711022100017',), ('4711080010112',), ('4711258001256',), ('4711258002505',), ('4711258004004',), ('4711258007371',), ('4711271000014',), ('4711271000090',), ('4711271000472',), ('4711524001041',), ('4711587808946',), ('4711587808953',), ('4711663700010',), ('4711985100024',), ('4712019100591',), ('4712019100607',), ('4712054701371',), ('4712425010255',), ('4712425010712',), ('4713089003102',), ('4713754987607',), ('4713754987614',), ('4713985863121',), ('4714082270041',), ('4714381003128',), ('4714981010038',), ('4718433613228',), ('4719090900058',), ('4719090900065',), ('4901422038939',), ('4901550152484',), ('50000301829',), ('719859796117',), ('723125485032',), ('7231254880206',), ('8888021200256',), ('9310052751203',)], 2: [('4710011401128', '4710011401135'), ('4710011401128', '4710011405133'), ('4710011401128', '4710011406123'), ('4710011401128', '4710011409056'), ('4710011401135', '4710011405133'), ('4710011401135', '4710011406123'), ('4710011401135', '4710011409056'), ('4710011405133', '4710011406123'), ('4710011405133', '4710011409056'), ('4710018004605', '4710018004704'), ('4710085120093', '4710085120628'), ('4710085120093', '4710085172696'), ('4710085120628', '4710085172696')], 3: [('4710011401128', '4710011401135', '4710011405133'), ('4710011401128', '4710011405133', '4710011409056')]}
# Measure the total time taken and report it
time_elapsed = time.time() - start_time
print(f'Duration: {time_elapsed}')
Duration: 134.5662498474121
Matheus Schmitz
LinkedIn
Github Portfolio