Sharing Data Among Processes (alternatively, the memory management blues)

bsod

I never thought I’d run into a problem with OS memory management, but it seems I’ve finally crossed that barrier. While writing a python script for correcting missing values in the PNWMoths database, I found out I needed to parallelize the work in some fashion after seeing how long it was going to take the script to produce output (It’s a O(n^2) with n approximately ~50k; this could be optimized with heuristics, but data accuracy takes huge precedence over optimization). Since I already had a working python script and the problem was trivially parallel, I assumed it’d be reasonably straightforward to parallelize. And it would have been, had I been using an OS with modern memory management.

Since the data I need to share between processes is only around ~10MB, I came upon the alternate solution of chunking the data up so that each process instance could read a full copy of the CSV into memory without overflowing to the disk. Long story short, I got it working with only a minor annoyance courtesy of Windows.

Enjoy the sparsely commented, cobbled together prototype.

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import csv
import cStringIO
import Levenshtein
from multiprocessing import Pool
import multiprocessing
import Queue
import time

def f(db_r):
  cs = open("csv.csv")
  csv_r = list(csv.reader(cs))
  cs.close()
  perfect = []
  unmatched = []
  possible = []
 
  for r in db_r:
    m = len(r)-r.count("")
    candidates = []
    perfect_match = False
   
    for s in csv_r:
      # compute score
      score = 0
      for i in xrange(0, min(len(s), len(r))):
        if(Levenshtein.ratio(r[i], s[i]) > 0.8):
          score += 1
      # put in candidate list
      if score == len(r):
          perfect.append(r)
          perfect_match = True
          break
      elif score >= m:
          candidates.append( {"score": score, "row": s} )

    if perfect_match:
      perfect.append(r)
      break

    if len(candidates) == 0:
      unmatched.append(r)
    else:
      # sort candidates by score
      sort_on = "score"
      decorated = [(dict_[sort_on], dict_) for dict_ in candidates]
      decorated.sort()
      ranked = [dict_ for (key, dict_) in decorated]
     
      # store
      possible.append( {"candidates": ranked, "row": r} )
  return {"worker": multiprocessing.current_process().name, "perfect": perfect, "unmatched": unmatched, "possible": possible}

if __name__ == '__main__':
  db = open("db.csv")
  db_r = list(csv.reader(db))
  db.close()
 
  def chunks(l, n):
    """ Yield successive n-sized chunks from l.
    """

    for i in xrange(0, len(l), n):
        yield l[i:i+n]
 
  processes = 4
  factor = 1
  db_r = list(chunks(db_r, int(len(db_r)/processes*factor)))
 
  pool = Pool(processes)
  result = pool.map(f, db_r)

  #output
  import pprint
  pprint.pprint(result, open("out.txt", "w"))