Matheus Schmitz
LinkedIn
Github Portfolio
import sys
from pyspark import SparkContext, SparkConf
from operator import add
import time
import math
# Track time taken
start_time = time.time()
import findspark
findspark.init()
sc = SparkContext.getOrCreate(SparkConf().set("spark.executor.memory", "4g").set("spark.driver.memory", "4g"))
sc.setLogLevel('ERROR')
threshold_filter = 25
support = 60
input_file_path = "ta_feng_all_months_merged.csv"
output_file_path = "market_basket.csv"
# Read the CSV skipping its header
csvRDD = sc.textFile(input_file_path, round(math.sqrt(support)))
#csvRDD = sc.textFile(input_file_path, min(support//2, 8))
csvHeader = csvRDD.first()
csvRDD = csvRDD.filter(lambda row: row != csvHeader) \
.map(lambda row: row.split(',')) \
.map(lambda row: (row[0][1:-1]+'-'+row[1][1:-1], str(int(row[5][1:-1]))))
# Convert the input data to baskets
bskts = csvRDD.groupByKey() \
.map(lambda row: list(row[1])) \
.filter(lambda row: len(row) > threshold_filter)
# Make the number of partitions and the support threshold available to all computing nodes
n_part= sc.broadcast(float(bskts.getNumPartitions()))
support = sc.broadcast(float(support))
def apriori(partition):
# Start with singletons, aka itemset size of one, aka k=1
k_size = 1
# Get all partition-frequent singles so that apriori can run its iterations
freq_items, baskets = frequentSingles(partition)
# Output frequent sets of size 1 (aka frequent singles)
yield k_size, freq_items
# Reshape the singles to (-1, 1) so that they are standardized to the shape of future itemsets
freq_items = [set([single]) for single in freq_items]
# Loop over an increasing k value until no frequent itemsets are found in this partition
while k_size >= 1:
k_size += 1
# Generate candidate itemsets for this partition
candidate_itemsets = {}
for a in freq_items:
for b in freq_items:
if len(set(a).union(set(b))) == k_size:
candidate_itemsets.update({tuple(sorted(set(a).union(set(b)))): 0})
# Loop through candidate_itemsets if any were generated
if bool(candidate_itemsets):
# Check each candidate pair in each basket
for itemset in candidate_itemsets.keys():
for basket in baskets:
# Count number of baskets that have all itemset elements as frequent items
if all(True if item in basket else False for item in itemset):
candidate_itemsets[itemset] += 1
# Keep only the items which pass the weighted threshold
freq_items = [itemset for itemset, count in candidate_itemsets.items() if count >= support.value/n_part.value]
# If there are k-sized itemsets yield them and move to the next k
if bool(freq_items):
yield k_size, freq_items
# If no frequent itemset for this value of k was found in this partition, then stop
else:
break
# If no candidate sets were generated in this iteration, then stop
else:
break
def frequentSingles(partition):
singleton_counts, baskets = {}, []
# Count item occurances in each basket in this partition
for list_of_values_grouped_by_key in partition:
# Also append the basket to a list contining all baskets
baskets.append(list_of_values_grouped_by_key)
for i in list_of_values_grouped_by_key:
if singleton_counts.get(i) == None:
singleton_counts[i] = 0
singleton_counts[i] += 1
# Filter occurances to keep only frequent singletons (considering a weighted threshold based on the number of partitions)
freq_items = [item for item, count in singleton_counts.items() if count >= support.value/n_part.value]
freq_items.sort()
return freq_items, baskets
# Apply A-Priori to each partition to find the candidate itemsets
candidatesRDD = bskts.mapPartitions(lambda partition: apriori(partition)) \
.reduceByKey(lambda x, y: sorted(set(x + y))) \
.sortBy(lambda candidate: candidate[0]) \
.collect()
# Broadcast candidate itemsets to all nodes
candidate_itemsets = sc.broadcast(candidatesRDD)
# Write the candidate itemsets to the output file
with open(output_file_path, 'w') as fout:
# Write the header for the candidates step
fout.write('Candidates:')
# Use a special output for the singletons, since they are not stored as tuples by python
fout.write('\n' + ','.join([f"('{candidate}')" for candidate in candidate_itemsets.value[0][1]]))
# For all other k-sized itemsets simply output them
for candidates in candidate_itemsets.value[1:]:
fout.write('\n\n' + ','.join([f"{candidate}" for candidate in candidates[1]]))
# Visualize the candidate itemsets found
candidatesRDD
[(1,
['20246013',
'20332433',
'20412074',
'20415723',
'20505233',
'20535407',
'20535414',
'20546236',
'20557003',
'2250062000090',
'2250078000251',
'2250271000034',
'2250271000218',
'25700010326',
'28400015547',
'28400017305',
'29000070295',
'29000070301',
'37000304593',
'37000329169',
'37000329206',
'37000337270',
'37000440147',
'37000440192',
'37000441809',
'37000442127',
'37000445111',
'3960097006002',
'40000015314',
'40000015321',
'40000757320',
'4006670360150',
'4014400901573',
'4014612509215',
'41186001559',
'41419761748',
'41736007284',
'4710008111146',
'4710008241140',
'4710008241218',
'4710008251125',
'4710008290025',
'4710008290056',
'4710008290155',
'4710010010017',
'4710011401128',
'4710011401135',
'4710011401142',
'4710011402019',
'4710011402026',
'4710011402194',
'4710011405133',
'4710011406123',
'4710011409056',
'4710011432825',
'4710011432856',
'4710012122121',
'4710012131130',
'4710012242140',
'4710015103370',
'4710015202721',
'4710015202912',
'4710018004605',
'4710018004704',
'4710018008634',
'4710018008733',
'4710018031632',
'4710022101208',
'4710022102304',
'4710022201496',
'4710022237501',
'4710022275503',
'47100312',
'4710032501791',
'4710032604751',
'4710032604812',
'4710035310055',
'4710035315517',
'4710035369510',
'4710036003581',
'4710036003598',
'4710036007039',
'4710036009071',
'4710036012019',
'4710036024036',
'4710036038033',
'4710043004564',
'4710043100082',
'4710043355437',
'4710043552065',
'4710043552102',
'4710043981513',
'4710046011101',
'4710046011156',
'4710046021100',
'4710046021155',
'4710047502011',
'4710047502066',
'4710054132201',
'4710054134403',
'4710054135202',
'4710054380619',
'4710056516832',
'4710063021091',
'4710063121036',
'4710063121661',
'4710063312168',
'4710084212010',
'4710084225676',
'4710084225829',
'4710085104116',
'4710085104130',
'4710085120093',
'4710085120475',
'4710085120628',
'4710085120680',
'4710085120697',
'4710085120703',
'4710085120710',
'4710085121007',
'4710085172696',
'4710085172702',
'4710088410139',
'4710088410207',
'4710088410382',
'4710088410443',
'4710088410610',
'4710088410795',
'4710088411761',
'4710088411785',
'4710088412201',
'4710088412218',
'4710088414021',
'4710088414052',
'4710088414113',
'4710088414311',
'4710088414328',
'4710088424655',
'4710088424778',
'4710088431295',
'4710088431301',
'4710088620156',
'4710094004711',
'4710094009273',
'4710094014383',
'4710094014406',
'4710094014741',
'4710094014765',
'4710094014789',
'4710094020063',
'4710094020179',
'4710094020193',
'4710094021466',
'4710094021565',
'4710094034039',
'4710094097768',
'4710094699078',
'4710095958006',
'4710095987402',
'4710098156133',
'4710104106107',
'4710104111569',
'4710104112924',
'4710105002019',
'4710105002026',
'4710105010137',
'4710105010182',
'4710105015118',
'4710105015125',
'4710105015514',
'4710105015521',
'4710105020310',
'4710105046013',
'4710105080420',
'4710109770396',
'4710110510523',
'4710110513265',
'4710114105046',
'4710114128038',
'4710114128618',
'4710114308119',
'4710114322115',
'4710114361114',
'4710114362029',
'4710114604112',
'4710114606048',
'4710114730002',
'4710120001653',
'4710120560129',
'4710126020467',
'4710126040663',
'4710126185012',
'4710126190535',
'4710126191259',
'4710126194281',
'4710128030020',
'4710128030037',
'4710128420203',
'4710134023214',
'4710134023276',
'4710140300453',
'4710154012144',
'4710154015206',
'4710154620264',
'4710160001033',
'4710160013074',
'4710167111018',
'4710168102206',
'4710168102237',
'4710168702079',
'4710168705056',
'4710171021327',
'4710171120228',
'4710172030014',
'4710174011790',
'4710174018195',
'4710174041551',
'4710174053691',
'4710175567128',
'4710176011040',
'4710176011194',
'4710176023098',
'4710176121145',
'4710186051098',
'4710186063145',
'4710186064364',
'4710186185021',
'4710189810333',
'4710199010174',
'4710247005206',
'4710247005299',
'4710247005831',
'4710247006128',
'4710247006562',
'4710247007613',
'4710249001992',
'4710249002005',
'4710249002012',
'4710249002029',
'4710254011627',
'4710254015021',
'4710254049323',
'4710254049521',
'4710265847666',
'4710265849066',
'4710291101039',
'4710291112172',
'4710291138134',
'4710303267814',
'4710303280172',
'4710303340234',
'4710304111147',
'4710311010211',
'4710311012413',
'4710311703014',
'4710311811511',
'4710320224265',
'4710321790509',
'4710321861186',
'4710323168016',
'4710323168054',
'4710323168078',
'4710363342001',
'4710363352000',
'4710363502009',
'4710363502016',
'4710363522007',
'4710363522014',
'4710363553025',
'4710363604000',
'4710363609005',
'4710367807421',
'4710367814337',
'4710421019081',
'4710421090059',
'4710452110115',
'4710466101130',
'4710466102052',
'4710466103073',
'4710466103080',
'4710467221196',
'4710467221516',
'4710492321038',
'4710494050110',
'4710498235148',
'4710498235674',
'4710530033039',
'4710543214012',
'4710543215040',
'4710543310059',
'4710543310066',
'4710549000039',
'4710579101003',
'4710583110015',
'4710583300089',
'4710583920003',
'4710583996008',
'4710585930154',
'4710585930673',
'4710603011018',
'4710603013425',
'4710626110156',
'4710626110613',
'4710626111252',
'4710626111351',
'4710626111450',
'4710626210955',
'4710626622857',
'4710626844358',
'4710661202960',
'4710670200100',
'4710670200407',
'47106710',
'4710683100015',
'4710685440362',
'4710685443820',
'4710685443837',
'4710731050101',
'4710731060124',
'4710734000011',
'4710740600090',
'4710740600106',
'4710762101025',
'4710762101032',
'4710762101063',
'4710823997208',
'4710823997239',
'4710852001013',
'4710857000028',
'4710857000059',
'4710857472634',
'4710868501019',
'4710871000165',
'4710892632017',
'4710908111116',
'4710908120019',
'4710908131534',
'4710908131589',
'4710908131824',
'4710943100328',
'4710943100335',
'4710943100410',
'4710943101318',
'4710960911082',
'4710988000553',
'4710998001205',
'4711001302104',
'4711001917018',
'4711022100017',
'4711080010112',
'4711080040195',
'4711128778813',
'4711128778875',
'4711128778882',
'4711258001256',
'4711258002505',
'4711258004004',
'4711258004110',
'4711258004509',
'4711258007371',
'4711271000014',
'4711271000090',
'4711271000472',
'4711300986999',
'4711390437081',
'4711413000025',
'4711437000018',
'4711524000891',
'4711524000907',
'4711524001041',
'4711572001178',
'4711587808946',
'4711587808953',
'4711634002297',
'4711634002570',
'4711634002587',
'4711663700010',
'4711713390031',
'4711856000125',
'4711856020215',
'4711863180070',
'4711985100017',
'4711985100024',
'4712019100591',
'4712019100607',
'4712031000022',
'4712054701371',
'4712054702255',
'4712162000038',
'4712172010010',
'4712425010255',
'4712425010446',
'4712425010569',
'4712425010712',
'4712453001072',
'4712598000015',
'4712697490816',
'4712934010074',
'4712987106649',
'4713080626225',
'4713089003102',
'4713754987607',
'4713754987614',
'4713985863121',
'4714058833195',
'4714058833218',
'4714082270041',
'4714108700040',
'4714242832669',
'4714381003128',
'4714390001023',
'4714981010038',
'4715874000150',
'4715874000518',
'4715874000600',
'4715874000662',
'4716701000060',
'4717101101265',
'4718433613228',
'4718433613310',
'4719090105002',
'4719090105019',
'4719090900058',
'4719090900065',
'4719111313102',
'4719581980309',
'4719854271752',
'4719859015061',
'4901422038939',
'4901550152484',
'4901550332077',
'4901872810451',
'4902430001908',
'4902430041232',
'4902430041270',
'4902738431155',
'4902757148805',
'4903111146055',
'4903111345717',
'4973540001256',
'50000301829',
'51000036872',
'719859796117',
'7231254801515',
'723125485025',
'723125485032',
'723125488019',
'7231254880206',
'723125488026',
'723125488033',
'723125488064',
'74261160506',
'76150215281',
'78895100020',
'8801019931536',
'8801266122145',
'8850332247152',
'8851111202645',
'8851111202652',
'8888021100624',
'8888021200256',
'8888420881162',
'9300644131711',
'9300644131759',
'9300644131766',
'9300644131773',
'9310042238981',
'9310042571491',
'9310052751203',
'9310052751227',
'9414863071854']),
(2,
[('4710011401128', '4710011401135'),
('4710011401128', '4710011401142'),
('4710011401128', '4710011405133'),
('4710011401128', '4710011406123'),
('4710011401128', '4710011409056'),
('4710011401135', '4710011405133'),
('4710011401135', '4710011406123'),
('4710011401135', '4710011409056'),
('4710011401142', '4710011405133'),
('4710011401142', '4710011406123'),
('4710011405133', '4710011406123'),
('4710011405133', '4710011409056'),
('4710011406123', '4710011409056'),
('4710018004605', '4710018004704'),
('4710018004605', '4710088410139'),
('4710018008634', '4710018008733'),
('4710018008634', '4710154620264'),
('4710043552102', '4711271000014'),
('4710085120093', '4710085120628'),
('4710085120093', '4710085172696'),
('4710085120093', '4710085172702'),
('4710085120628', '4710085172696'),
('4710085120628', '4710085172702'),
('4710085120680', '4710085120697'),
('4710085120703', '4710085120710'),
('4710085172696', '4710085172702'),
('4710088410139', '4710088410207'),
('4710088414311', '4710088414328'),
('4710114105046', '4711271000014'),
('4710114128038', '4710114606048'),
('4710114128038', '4711271000014'),
('4710114606048', '4711271000014'),
('4710254049323', '4710254049521'),
('4710908131589', '4711271000014'),
('4711271000014', '4714981010038'),
('4711524000891', '4711524000907'),
('4711524000891', '4711524001041'),
('4711524000907', '4711524001041'),
('4713754987614', '7231254880206'),
('4714981010038', '4719090900065'),
('723125485032', '7231254880206')]),
(3,
[('4710011401128', '4710011401135', '4710011405133'),
('4710011401128', '4710011401135', '4710011406123'),
('4710011401128', '4710011401135', '4710011409056'),
('4710011401128', '4710011405133', '4710011406123'),
('4710011401128', '4710011405133', '4710011409056'),
('4710011401128', '4710011406123', '4710011409056'),
('4710011401135', '4710011405133', '4710011406123'),
('4710011401135', '4710011405133', '4710011409056'),
('4710011401135', '4710011406123', '4710011409056'),
('4710011401142', '4710011405133', '4710011406123'),
('4710011405133', '4710011406123', '4710011409056'),
('4710085120093', '4710085120628', '4710085172696'),
('4710085120628', '4710085172696', '4710085172702'),
('4711524000891', '4711524000907', '4711524001041')]),
(4, [('4710011401128', '4710011401135', '4710011405133', '4710011409056')])]
def formattedCandidateCounts(partition):
# Get a dicitionary with the count of all singletons in this partition
conts_dict = countCandidates(partition)
# Single item candidates become strings instead of tuple, fix that
for candidate, count in conts_dict.items():
candidate_as_tuple = candidate if type(candidate) == tuple else tuple([candidate])
yield candidate_as_tuple, count
def countCandidates(partition):
# Count the number of candidate occurances in each partition (which is a set of baskets)
counts = {}
# Loop though each basket in the partition
for list_iterator in partition:
# Loop through each set of cancidates from apriori (which are indexed by itemset size)
for candidate in candidate_itemsets.value:
# For each candidate itemset of a given size, check if all its sub-elements are in a given basket, if yes, then add 1 to the count
for itemset in candidate[1]:
# Coerce singletorns to tutple type
itemset_as_tuple = itemset if type(itemset) == tuple else tuple([itemset])
# If all items in the itemset are frequent in a given basket, increase the itemset's counter by 1
if all(True if item in list_iterator else False for item in itemset_as_tuple):
if counts.get(itemset) == None:
counts[itemset] = 0
counts[itemset] += 1
return counts
# Count candidates to see which are truly frequent itemsets
freqItemsetsRDD = bskts.mapPartitions(lambda partition: formattedCandidateCounts(partition)) \
.reduceByKey(add) \
.filter(lambda candidate: candidate[1] >= support.value) \
.keys().sortBy(lambda itemset: (len(itemset), itemset)) \
.collect()
# Organize the itemsets by size, for outputting
output = dict()
for itemset in freqItemsetsRDD:
itemset_size = len(itemset)
if output.get(itemset_size) == None:
output[itemset_size] = []
output[itemset_size].append(itemset)
# Open the output file in append mode and add the frequent itemsets
with open(output_file_path, 'a') as fout:
# Add a header for Frequent Itemsets
fout.write('\n\n' + 'Frequent Itemsets:')
# Use a special output for the singletons, since they are not stored as tuples by python
fout.write('\n' + ','.join([f"('{itemset[0]}')" for itemset in output[1]]))
# For all other k-sized itemsets simply output them
for itemset_size in list(output.keys())[1:]:
fout.write('\n\n' + ','.join([f"{itemset}" for itemset in output[itemset_size]]))
# Visualize the frequent itemsets found
output
{1: [('20412074',),
('20557003',),
('37000329169',),
('37000329206',),
('37000337270',),
('37000440147',),
('37000442127',),
('37000445111',),
('4014400901573',),
('4710011401128',),
('4710011401135',),
('4710011401142',),
('4710011402019',),
('4710011405133',),
('4710011406123',),
('4710011409056',),
('4710011432825',),
('4710012122121',),
('4710012131130',),
('4710015103370',),
('4710015202721',),
('4710018004605',),
('4710018004704',),
('4710018008634',),
('4710022201496',),
('4710022237501',),
('4710022275503',),
('4710032501791',),
('4710035369510',),
('4710036003581',),
('4710036007039',),
('4710036009071',),
('4710036012019',),
('4710043552102',),
('4710046021100',),
('4710054134403',),
('4710054380619',),
('4710063312168',),
('4710084225676',),
('4710085104116',),
('4710085104130',),
('4710085120093',),
('4710085120628',),
('4710085120680',),
('4710085120703',),
('4710085120710',),
('4710085172696',),
('4710085172702',),
('4710088410139',),
('4710088410207',),
('4710088410382',),
('4710088410443',),
('4710088410610',),
('4710088414311',),
('4710088414328',),
('4710088620156',),
('4710094004711',),
('4710094014765',),
('4710094021466',),
('4710094097768',),
('4710094699078',),
('4710104106107',),
('4710104111569',),
('4710104112924',),
('4710105002019',),
('4710105002026',),
('4710114105046',),
('4710114128038',),
('4710114322115',),
('4710114362029',),
('4710114606048',),
('4710128030037',),
('4710134023214',),
('4710154012144',),
('4710154015206',),
('4710154620264',),
('4710160001033',),
('4710168705056',),
('4710174041551',),
('4710174053691',),
('4710247005299',),
('4710247006562',),
('4710254049323',),
('4710254049521',),
('4710265849066',),
('4710291112172',),
('4710311703014',),
('4710321790509',),
('4710323168054',),
('4710363352000',),
('4710363522007',),
('4710421090059',),
('4710452110115',),
('4710466101130',),
('4710466103080',),
('4710467221196',),
('4710494050110',),
('4710498235148',),
('4710543214012',),
('4710543215040',),
('4710543310059',),
('4710543310066',),
('4710579101003',),
('4710583110015',),
('4710583996008',),
('4710603013425',),
('4710626111351',),
('4710626111450',),
('4710626622857',),
('47106710',),
('4710685440362',),
('4710731050101',),
('4710740600090',),
('4710857000059',),
('4710892632017',),
('4710908111116',),
('4710908120019',),
('4710908131534',),
('4710908131589',),
('4710908131824',),
('4710943101318',),
('4711001302104',),
('4711022100017',),
('4711080010112',),
('4711258001256',),
('4711258002505',),
('4711258004004',),
('4711258007371',),
('4711271000014',),
('4711271000090',),
('4711271000472',),
('4711524001041',),
('4711587808946',),
('4711587808953',),
('4711663700010',),
('4711985100024',),
('4712019100591',),
('4712019100607',),
('4712054701371',),
('4712425010255',),
('4712425010712',),
('4713089003102',),
('4713754987607',),
('4713754987614',),
('4713985863121',),
('4714082270041',),
('4714381003128',),
('4714981010038',),
('4718433613228',),
('4719090900058',),
('4719090900065',),
('4901422038939',),
('4901550152484',),
('50000301829',),
('719859796117',),
('723125485032',),
('7231254880206',),
('8888021200256',),
('9310052751203',)],
2: [('4710011401128', '4710011401135'),
('4710011401128', '4710011405133'),
('4710011401128', '4710011406123'),
('4710011401128', '4710011409056'),
('4710011401135', '4710011405133'),
('4710011401135', '4710011406123'),
('4710011401135', '4710011409056'),
('4710011405133', '4710011406123'),
('4710011405133', '4710011409056'),
('4710018004605', '4710018004704'),
('4710085120093', '4710085120628'),
('4710085120093', '4710085172696'),
('4710085120628', '4710085172696')],
3: [('4710011401128', '4710011401135', '4710011405133'),
('4710011401128', '4710011405133', '4710011409056')]}
# Measure the total time taken and report it
time_elapsed = time.time() - start_time
print(f'Duration: {time_elapsed}')
Duration: 134.5662498474121
Matheus Schmitz
LinkedIn
Github Portfolio