1   
   2   
   3   
   4   
   5   
   6   
   7   
   8   
   9   
  10   
  11   
  12   
  13   
  14   
  15   
  16   
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from itertools import chain, ifilter, imap 
  22  import operator 
  23  import os 
  24  import sys 
  25  import shlex 
  26  import traceback 
  27  from subprocess import Popen, PIPE 
  28  from tempfile import NamedTemporaryFile 
  29  from threading import Thread 
  30  import warnings 
  31  import heapq 
  32   
  33  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  34      BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long 
  35  from pyspark.join import python_join, python_left_outer_join, \ 
  36      python_right_outer_join, python_cogroup 
  37  from pyspark.statcounter import StatCounter 
  38  from pyspark.rddsampler import RDDSampler 
  39  from pyspark.storagelevel import StorageLevel 
  40   
  41  from py4j.java_collections import ListConverter, MapConverter 
  42   
  43  __all__ = ["RDD"] 
  47      tb = traceback.extract_stack() 
  48      if len(tb) == 0: 
  49          return "I'm lost!" 
  50       
  51       
  52       
  53      file, line, module, what = tb[len(tb) - 1] 
  54      sparkpath = os.path.dirname(file) 
  55      first_spark_frame = len(tb) - 1 
  56      for i in range(0, len(tb)): 
  57          file, line, fun, what = tb[i] 
  58          if file.startswith(sparkpath): 
  59              first_spark_frame = i 
  60              break 
  61      if first_spark_frame == 0: 
  62          file, line, fun, what = tb[0] 
  63          return "%s at %s:%d" % (fun, file, line) 
  64      sfile, sline, sfun, swhat = tb[first_spark_frame] 
  65      ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 
  66      return "%s at %s:%d" % (sfun, ufile, uline) 
   67   
  68  _spark_stack_depth = 0 
  72          self._traceback = _extract_concise_traceback() 
  73          self._context = sc 
   74   
  80   
   86   
  88      """ 
  89      An implementation of MaxHeap. 
  90      >>> import pyspark.rdd 
  91      >>> heap = pyspark.rdd.MaxHeapQ(5) 
  92      >>> [heap.insert(i) for i in range(10)] 
  93      [None, None, None, None, None, None, None, None, None, None] 
  94      >>> sorted(heap.getElements()) 
  95      [0, 1, 2, 3, 4] 
  96      >>> heap = pyspark.rdd.MaxHeapQ(5) 
  97      >>> [heap.insert(i) for i in range(9, -1, -1)] 
  98      [None, None, None, None, None, None, None, None, None, None] 
  99      >>> sorted(heap.getElements()) 
 100      [0, 1, 2, 3, 4] 
 101      >>> heap = pyspark.rdd.MaxHeapQ(1) 
 102      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 103      [None, None, None, None, None, None, None, None, None, None] 
 104      >>> heap.getElements() 
 105      [0] 
 106      """ 
 107   
 109           
 110          self.q = [0] 
 111          self.maxsize = maxsize 
  112   
 114          while (k > 1) and (self.q[k/2] < self.q[k]): 
 115              self._swap(k, k/2) 
 116              k = k/2 
  117   
 119          t = self.q[i] 
 120          self.q[i] = self.q[j] 
 121          self.q[j] = t 
  122   
 124          N = self.size() 
 125          while 2 * k <= N: 
 126              j = 2 * k 
 127               
 128               
 129              if j < N and self.q[j] < self.q[j + 1]: 
 130                  j = j + 1 
 131              if(self.q[k] > self.q[j]): 
 132                  break 
 133              self._swap(k, j) 
 134              k = j 
  135   
 137          return len(self.q) - 1 
  138   
 140          if (self.size()) < self.maxsize: 
 141              self.q.append(value) 
 142              self._swim(self.size()) 
 143          else: 
 144              self._replaceRoot(value) 
  145   
 148   
 150          if(self.q[1] > value): 
 151              self.q[1] = value 
 152              self._sink(1) 
   153   
 155      """ 
 156      A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
 157      Represents an immutable, partitioned collection of elements that can be 
 158      operated on in parallel. 
 159      """ 
 160   
 161 -    def __init__(self, jrdd, ctx, jrdd_deserializer): 
  162          self._jrdd = jrdd 
 163          self.is_cached = False 
 164          self.is_checkpointed = False 
 165          self.ctx = ctx 
 166          self._jrdd_deserializer = jrdd_deserializer 
  167   
 169          return self._jrdd.toString() 
  170   
 171      @property 
 173          """ 
 174          The L{SparkContext} that this RDD was created on. 
 175          """ 
 176          return self.ctx 
  177   
 179          """ 
 180          Persist this RDD with the default storage level (C{MEMORY_ONLY}). 
 181          """ 
 182          self.is_cached = True 
 183          self._jrdd.cache() 
 184          return self 
  185   
 187          """ 
 188          Set this RDD's storage level to persist its values across operations after the first time 
 189          it is computed. This can only be used to assign a new storage level if the RDD does not 
 190          have a storage level set yet. 
 191          """ 
 192          self.is_cached = True 
 193          javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 
 194          self._jrdd.persist(javaStorageLevel) 
 195          return self 
  196   
 198          """ 
 199          Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 
 200          """ 
 201          self.is_cached = False 
 202          self._jrdd.unpersist() 
 203          return self 
  204   
 206          """ 
 207          Mark this RDD for checkpointing. It will be saved to a file inside the 
 208          checkpoint directory set with L{SparkContext.setCheckpointDir()} and 
 209          all references to its parent RDDs will be removed. This function must 
 210          be called before any job has been executed on this RDD. It is strongly 
 211          recommended that this RDD is persisted in memory, otherwise saving it 
 212          on a file will require recomputation. 
 213          """ 
 214          self.is_checkpointed = True 
 215          self._jrdd.rdd().checkpoint() 
  216   
 218          """ 
 219          Return whether this RDD has been checkpointed or not 
 220          """ 
 221          return self._jrdd.rdd().isCheckpointed() 
  222   
 224          """ 
 225          Gets the name of the file to which this RDD was checkpointed 
 226          """ 
 227          checkpointFile = self._jrdd.rdd().getCheckpointFile() 
 228          if checkpointFile.isDefined(): 
 229              return checkpointFile.get() 
 230          else: 
 231              return None 
  232   
 233 -    def map(self, f, preservesPartitioning=False): 
  234          """ 
 235          Return a new RDD by applying a function to each element of this RDD. 
 236          """ 
 237          def func(split, iterator): return imap(f, iterator) 
 238          return PipelinedRDD(self, func, preservesPartitioning) 
  239   
 240 -    def flatMap(self, f, preservesPartitioning=False): 
  241          """ 
 242          Return a new RDD by first applying a function to all elements of this 
 243          RDD, and then flattening the results. 
 244   
 245          >>> rdd = sc.parallelize([2, 3, 4]) 
 246          >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 
 247          [1, 1, 1, 2, 2, 3] 
 248          >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 
 249          [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 
 250          """ 
 251          def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 
 252          return self.mapPartitionsWithIndex(func, preservesPartitioning) 
  253   
 255          """ 
 256          Return a new RDD by applying a function to each partition of this RDD. 
 257   
 258          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 259          >>> def f(iterator): yield sum(iterator) 
 260          >>> rdd.mapPartitions(f).collect() 
 261          [3, 7] 
 262          """ 
 263          def func(s, iterator): return f(iterator) 
 264          return self.mapPartitionsWithIndex(func) 
  265   
 267          """ 
 268          Return a new RDD by applying a function to each partition of this RDD, 
 269          while tracking the index of the original partition. 
 270   
 271          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 272          >>> def f(splitIndex, iterator): yield splitIndex 
 273          >>> rdd.mapPartitionsWithIndex(f).sum() 
 274          6 
 275          """ 
 276          return PipelinedRDD(self, f, preservesPartitioning) 
  277   
 279          """ 
 280          Deprecated: use mapPartitionsWithIndex instead. 
 281   
 282          Return a new RDD by applying a function to each partition of this RDD, 
 283          while tracking the index of the original partition. 
 284   
 285          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 286          >>> def f(splitIndex, iterator): yield splitIndex 
 287          >>> rdd.mapPartitionsWithSplit(f).sum() 
 288          6 
 289          """ 
 290          warnings.warn("mapPartitionsWithSplit is deprecated; " 
 291              "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 
 292          return self.mapPartitionsWithIndex(f, preservesPartitioning) 
  293   
 295          """ 
 296          Return a new RDD containing only the elements that satisfy a predicate. 
 297   
 298          >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 
 299          >>> rdd.filter(lambda x: x % 2 == 0).collect() 
 300          [2, 4] 
 301          """ 
 302          def func(iterator): return ifilter(f, iterator) 
 303          return self.mapPartitions(func) 
  304   
 306          """ 
 307          Return a new RDD containing the distinct elements in this RDD. 
 308   
 309          >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 
 310          [1, 2, 3] 
 311          """ 
 312          return self.map(lambda x: (x, None)) \ 
 313                     .reduceByKey(lambda x, _: x) \ 
 314                     .map(lambda (x, _): x) 
  315   
 316 -    def sample(self, withReplacement, fraction, seed): 
  317          """ 
 318          Return a sampled subset of this RDD (relies on numpy and falls back 
 319          on default random generator if numpy is unavailable). 
 320   
 321          >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 
 322          [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 
 323          """ 
 324          assert fraction >= 0.0, "Invalid fraction value: %s" % fraction 
 325          return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) 
  326   
 327       
 328 -    def takeSample(self, withReplacement, num, seed): 
  329          """ 
 330          Return a fixed-size sampled subset of this RDD (currently requires numpy). 
 331   
 332          >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 
 333          [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 
 334          """ 
 335   
 336          fraction = 0.0 
 337          total = 0 
 338          multiplier = 3.0 
 339          initialCount = self.count() 
 340          maxSelected = 0 
 341   
 342          if (num < 0): 
 343              raise ValueError 
 344   
 345          if (initialCount == 0): 
 346              return list() 
 347   
 348          if initialCount > sys.maxint - 1: 
 349              maxSelected = sys.maxint - 1 
 350          else: 
 351              maxSelected = initialCount 
 352   
 353          if num > initialCount and not withReplacement: 
 354              total = maxSelected 
 355              fraction = multiplier * (maxSelected + 1) / initialCount 
 356          else: 
 357              fraction = multiplier * (num + 1) / initialCount 
 358              total = num 
 359   
 360          samples = self.sample(withReplacement, fraction, seed).collect() 
 361   
 362           
 363           
 364           
 365          while len(samples) < total: 
 366              if seed > sys.maxint - 2: 
 367                  seed = -1 
 368              seed += 1 
 369              samples = self.sample(withReplacement, fraction, seed).collect() 
 370   
 371          sampler = RDDSampler(withReplacement, fraction, seed+1) 
 372          sampler.shuffle(samples) 
 373          return samples[0:total] 
  374   
 376          """ 
 377          Return the union of this RDD and another one. 
 378   
 379          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 380          >>> rdd.union(rdd).collect() 
 381          [1, 1, 2, 3, 1, 1, 2, 3] 
 382          """ 
 383          if self._jrdd_deserializer == other._jrdd_deserializer: 
 384              rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 
 385                        self._jrdd_deserializer) 
 386              return rdd 
 387          else: 
 388               
 389               
 390              self_copy = self._reserialize() 
 391              other_copy = other._reserialize() 
 392              return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 
 393                         self.ctx.serializer) 
  394   
 396          if self._jrdd_deserializer == self.ctx.serializer: 
 397              return self 
 398          else: 
 399              return self.map(lambda x: x, preservesPartitioning=True) 
  400   
 402          """ 
 403          Return the union of this RDD and another one. 
 404   
 405          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 406          >>> (rdd + rdd).collect() 
 407          [1, 1, 2, 3, 1, 1, 2, 3] 
 408          """ 
 409          if not isinstance(other, RDD): 
 410              raise TypeError 
 411          return self.union(other) 
  412   
 413 -    def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x): 
  414          """ 
 415          Sorts this RDD, which is assumed to consist of (key, value) pairs. 
 416   
 417          >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 418          >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 
 419          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 420          >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 
 421          >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 
 422          >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 
 423          [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 
 424          """ 
 425          if numPartitions is None: 
 426              numPartitions = self.ctx.defaultParallelism 
 427   
 428          bounds = list() 
 429   
 430           
 431           
 432           
 433          if numPartitions > 1: 
 434              rddSize = self.count() 
 435              maxSampleSize = numPartitions * 20.0  
 436              fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 
 437   
 438              samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 
 439              samples = sorted(samples, reverse=(not ascending), key=keyfunc) 
 440   
 441               
 442               
 443              for i in range(0, numPartitions - 1): 
 444                  index = (len(samples) - 1) * (i + 1) / numPartitions 
 445                  bounds.append(samples[index]) 
 446   
 447          def rangePartitionFunc(k): 
 448              p = 0 
 449              while p < len(bounds) and keyfunc(k) > bounds[p]: 
 450                  p += 1 
 451              if ascending: 
 452                  return p 
 453              else: 
 454                  return numPartitions-1-p 
  455   
 456          def mapFunc(iterator): 
 457              yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k)) 
  458   
 459          return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 
 460                      .mapPartitions(mapFunc,preservesPartitioning=True) 
 461                      .flatMap(lambda x: x, preservesPartitioning=True)) 
 462   
 464          """ 
 465          Return an RDD created by coalescing all elements within each partition 
 466          into a list. 
 467   
 468          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 469          >>> sorted(rdd.glom().collect()) 
 470          [[1, 2], [3, 4]] 
 471          """ 
 472          def func(iterator): yield list(iterator) 
 473          return self.mapPartitions(func) 
  474   
 476          """ 
 477          Return the Cartesian product of this RDD and another one, that is, the 
 478          RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 
 479          C{b} is in C{other}. 
 480   
 481          >>> rdd = sc.parallelize([1, 2]) 
 482          >>> sorted(rdd.cartesian(rdd).collect()) 
 483          [(1, 1), (1, 2), (2, 1), (2, 2)] 
 484          """ 
 485           
 486          deserializer = CartesianDeserializer(self._jrdd_deserializer, 
 487                                               other._jrdd_deserializer) 
 488          return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer) 
  489   
 490 -    def groupBy(self, f, numPartitions=None): 
  491          """ 
 492          Return an RDD of grouped items. 
 493   
 494          >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 
 495          >>> result = rdd.groupBy(lambda x: x % 2).collect() 
 496          >>> sorted([(x, sorted(y)) for (x, y) in result]) 
 497          [(0, [2, 8]), (1, [1, 1, 3, 5])] 
 498          """ 
 499          return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) 
  500   
 501 -    def pipe(self, command, env={}): 
  502          """ 
 503          Return an RDD created by piping elements to a forked external process. 
 504   
 505          >>> sc.parallelize([1, 2, 3]).pipe('cat').collect() 
 506          ['1', '2', '3'] 
 507          """ 
 508          def func(iterator): 
 509              pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
 510              def pipe_objs(out): 
 511                  for obj in iterator: 
 512                      out.write(str(obj).rstrip('\n') + '\n') 
 513                  out.close() 
  514              Thread(target=pipe_objs, args=[pipe.stdin]).start() 
 515              return (x.rstrip('\n') for x in pipe.stdout) 
 516          return self.mapPartitions(func) 
 517   
 519          """ 
 520          Applies a function to all elements of this RDD. 
 521   
 522          >>> def f(x): print x 
 523          >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 
 524          """ 
 525          def processPartition(iterator): 
 526              for x in iterator: 
 527                  f(x) 
 528              yield None 
  529          self.mapPartitions(processPartition).collect()   
 530   
 532          """ 
 533          Return a list that contains all of the elements in this RDD. 
 534          """ 
 535          with _JavaStackTrace(self.context) as st: 
 536            bytesInJava = self._jrdd.collect().iterator() 
 537          return list(self._collect_iterator_through_file(bytesInJava)) 
  538   
 540           
 541           
 542           
 543          tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 
 544          tempFile.close() 
 545          self.ctx._writeToFile(iterator, tempFile.name) 
 546           
 547          with open(tempFile.name, 'rb') as tempFile: 
 548              for item in self._jrdd_deserializer.load_stream(tempFile): 
 549                  yield item 
 550          os.unlink(tempFile.name) 
  551   
 553          """ 
 554          Reduces the elements of this RDD using the specified commutative and 
 555          associative binary operator. 
 556   
 557          >>> from operator import add 
 558          >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 
 559          15 
 560          >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 
 561          10 
 562          """ 
 563          def func(iterator): 
 564              acc = None 
 565              for obj in iterator: 
 566                  if acc is None: 
 567                      acc = obj 
 568                  else: 
 569                      acc = f(obj, acc) 
 570              if acc is not None: 
 571                  yield acc 
  572          vals = self.mapPartitions(func).collect() 
 573          return reduce(f, vals) 
 574   
 575 -    def fold(self, zeroValue, op): 
  576          """ 
 577          Aggregate the elements of each partition, and then the results for all 
 578          the partitions, using a given associative function and a neutral "zero 
 579          value." 
 580   
 581          The function C{op(t1, t2)} is allowed to modify C{t1} and return it 
 582          as its result value to avoid object allocation; however, it should not 
 583          modify C{t2}. 
 584   
 585          >>> from operator import add 
 586          >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 
 587          15 
 588          """ 
 589          def func(iterator): 
 590              acc = zeroValue 
 591              for obj in iterator: 
 592                  acc = op(obj, acc) 
 593              yield acc 
  594          vals = self.mapPartitions(func).collect() 
 595          return reduce(op, vals, zeroValue) 
 596   
 597       
 598   
 600          """ 
 601          Add up the elements in this RDD. 
 602   
 603          >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 
 604          6.0 
 605          """ 
 606          return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 
  607   
 609          """ 
 610          Return the number of elements in this RDD. 
 611   
 612          >>> sc.parallelize([2, 3, 4]).count() 
 613          3 
 614          """ 
 615          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
  616   
 618          """ 
 619          Return a L{StatCounter} object that captures the mean, variance 
 620          and count of the RDD's elements in one operation. 
 621          """ 
 622          def redFunc(left_counter, right_counter): 
 623              return left_counter.mergeStats(right_counter) 
  624   
 625          return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 
 626   
 628          """ 
 629          Compute the mean of this RDD's elements. 
 630   
 631          >>> sc.parallelize([1, 2, 3]).mean() 
 632          2.0 
 633          """ 
 634          return self.stats().mean() 
  635   
 637          """ 
 638          Compute the variance of this RDD's elements. 
 639   
 640          >>> sc.parallelize([1, 2, 3]).variance() 
 641          0.666... 
 642          """ 
 643          return self.stats().variance() 
  644   
 646          """ 
 647          Compute the standard deviation of this RDD's elements. 
 648   
 649          >>> sc.parallelize([1, 2, 3]).stdev() 
 650          0.816... 
 651          """ 
 652          return self.stats().stdev() 
  653   
 655          """ 
 656          Compute the sample standard deviation of this RDD's elements (which corrects for bias in 
 657          estimating the standard deviation by dividing by N-1 instead of N). 
 658   
 659          >>> sc.parallelize([1, 2, 3]).sampleStdev() 
 660          1.0 
 661          """ 
 662          return self.stats().sampleStdev() 
  663   
 665          """ 
 666          Compute the sample variance of this RDD's elements (which corrects for bias in 
 667          estimating the variance by dividing by N-1 instead of N). 
 668   
 669          >>> sc.parallelize([1, 2, 3]).sampleVariance() 
 670          1.0 
 671          """ 
 672          return self.stats().sampleVariance() 
  673   
 675          """ 
 676          Return the count of each unique value in this RDD as a dictionary of 
 677          (value, count) pairs. 
 678   
 679          >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 
 680          [(1, 2), (2, 3)] 
 681          """ 
 682          def countPartition(iterator): 
 683              counts = defaultdict(int) 
 684              for obj in iterator: 
 685                  counts[obj] += 1 
 686              yield counts 
  687          def mergeMaps(m1, m2): 
 688              for (k, v) in m2.iteritems(): 
 689                  m1[k] += v 
 690              return m1 
 691          return self.mapPartitions(countPartition).reduce(mergeMaps) 
 692       
 693 -    def top(self, num): 
  694          """ 
 695          Get the top N elements from a RDD. 
 696   
 697          Note: It returns the list sorted in descending order. 
 698          >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 
 699          [12] 
 700          >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2) 
 701          [6, 5] 
 702          """ 
 703          def topIterator(iterator): 
 704              q = [] 
 705              for k in iterator: 
 706                  if len(q) < num: 
 707                      heapq.heappush(q, k) 
 708                  else: 
 709                      heapq.heappushpop(q, k) 
 710              yield q 
  711   
 712          def merge(a, b): 
 713              return next(topIterator(a + b)) 
 714   
 715          return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 
 716   
 718          """ 
 719          Get the N elements from a RDD ordered in ascending order or as specified 
 720          by the optional key function.  
 721   
 722          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 
 723          [1, 2, 3, 4, 5, 6] 
 724          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 
 725          [10, 9, 7, 6, 5, 4] 
 726          """ 
 727   
 728          def topNKeyedElems(iterator, key_=None): 
 729              q = MaxHeapQ(num) 
 730              for k in iterator: 
 731                  if key_ != None: 
 732                      k = (key_(k), k) 
 733                  q.insert(k) 
 734              yield q.getElements() 
  735   
 736          def unKey(x, key_=None): 
 737              if key_ != None: 
 738                  x = [i[1] for i in x] 
 739              return x 
 740           
 741          def merge(a, b): 
 742              return next(topNKeyedElems(a + b)) 
 743          result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) 
 744          return sorted(unKey(result, key), key=key) 
 745   
 746   
 747 -    def take(self, num): 
  748          """ 
 749          Take the first num elements of the RDD. 
 750   
 751          This currently scans the partitions *one by one*, so it will be slow if 
 752          a lot of partitions are required. In that case, use L{collect} to get 
 753          the whole RDD instead. 
 754   
 755          >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 
 756          [2, 3] 
 757          >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 
 758          [2, 3, 4, 5, 6] 
 759          """ 
 760          def takeUpToNum(iterator): 
 761              taken = 0 
 762              while taken < num: 
 763                  yield next(iterator) 
 764                  taken += 1 
  765           
 766          mapped = self.mapPartitions(takeUpToNum) 
 767          items = [] 
 768           
 769           
 770           
 771          with _JavaStackTrace(self.context) as st: 
 772              for partition in range(mapped._jrdd.splits().size()): 
 773                  partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 
 774                  partitionsToTake[0] = partition 
 775                  iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 
 776                  items.extend(mapped._collect_iterator_through_file(iterator)) 
 777                  if len(items) >= num: 
 778                      break 
 779          return items[:num] 
 780   
 782          """ 
 783          Return the first element in this RDD. 
 784   
 785          >>> sc.parallelize([2, 3, 4]).first() 
 786          2 
 787          """ 
 788          return self.take(1)[0] 
  789   
 790 -    def saveAsTextFile(self, path): 
  791          """ 
 792          Save this RDD as a text file, using string representations of elements. 
 793   
 794          >>> tempFile = NamedTemporaryFile(delete=True) 
 795          >>> tempFile.close() 
 796          >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 
 797          >>> from fileinput import input 
 798          >>> from glob import glob 
 799          >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 
 800          '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 
 801          """ 
 802          def func(split, iterator): 
 803              for x in iterator: 
 804                  if not isinstance(x, basestring): 
 805                      x = unicode(x) 
 806                  yield x.encode("utf-8") 
  807          keyed = PipelinedRDD(self, func) 
 808          keyed._bypass_serializer = True 
 809          keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 
 810   
 811       
 812   
 814          """ 
 815          Return the key-value pairs in this RDD to the master as a dictionary. 
 816   
 817          >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 
 818          >>> m[1] 
 819          2 
 820          >>> m[3] 
 821          4 
 822          """ 
 823          return dict(self.collect()) 
  824   
 826          """ 
 827          Merge the values for each key using an associative reduce function. 
 828   
 829          This will also perform the merging locally on each mapper before 
 830          sending results to a reducer, similarly to a "combiner" in MapReduce. 
 831   
 832          Output will be hash-partitioned with C{numPartitions} partitions, or 
 833          the default parallelism level if C{numPartitions} is not specified. 
 834   
 835          >>> from operator import add 
 836          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 837          >>> sorted(rdd.reduceByKey(add).collect()) 
 838          [('a', 2), ('b', 1)] 
 839          """ 
 840          return self.combineByKey(lambda x: x, func, func, numPartitions) 
  841   
 843          """ 
 844          Merge the values for each key using an associative reduce function, but 
 845          return the results immediately to the master as a dictionary. 
 846   
 847          This will also perform the merging locally on each mapper before 
 848          sending results to a reducer, similarly to a "combiner" in MapReduce. 
 849   
 850          >>> from operator import add 
 851          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 852          >>> sorted(rdd.reduceByKeyLocally(add).items()) 
 853          [('a', 2), ('b', 1)] 
 854          """ 
 855          def reducePartition(iterator): 
 856              m = {} 
 857              for (k, v) in iterator: 
 858                  m[k] = v if k not in m else func(m[k], v) 
 859              yield m 
  860          def mergeMaps(m1, m2): 
 861              for (k, v) in m2.iteritems(): 
 862                  m1[k] = v if k not in m1 else func(m1[k], v) 
 863              return m1 
 864          return self.mapPartitions(reducePartition).reduce(mergeMaps) 
 865   
 867          """ 
 868          Count the number of elements for each key, and return the result to the 
 869          master as a dictionary. 
 870   
 871          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 872          >>> sorted(rdd.countByKey().items()) 
 873          [('a', 2), ('b', 1)] 
 874          """ 
 875          return self.map(lambda x: x[0]).countByValue() 
  876   
 877 -    def join(self, other, numPartitions=None): 
  878          """ 
 879          Return an RDD containing all pairs of elements with matching keys in 
 880          C{self} and C{other}. 
 881   
 882          Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 
 883          (k, v1) is in C{self} and (k, v2) is in C{other}. 
 884   
 885          Performs a hash join across the cluster. 
 886   
 887          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
 888          >>> y = sc.parallelize([("a", 2), ("a", 3)]) 
 889          >>> sorted(x.join(y).collect()) 
 890          [('a', (1, 2)), ('a', (1, 3))] 
 891          """ 
 892          return python_join(self, other, numPartitions) 
  893   
 895          """ 
 896          Perform a left outer join of C{self} and C{other}. 
 897   
 898          For each element (k, v) in C{self}, the resulting RDD will either 
 899          contain all pairs (k, (v, w)) for w in C{other}, or the pair 
 900          (k, (v, None)) if no elements in other have key k. 
 901   
 902          Hash-partitions the resulting RDD into the given number of partitions. 
 903   
 904          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
 905          >>> y = sc.parallelize([("a", 2)]) 
 906          >>> sorted(x.leftOuterJoin(y).collect()) 
 907          [('a', (1, 2)), ('b', (4, None))] 
 908          """ 
 909          return python_left_outer_join(self, other, numPartitions) 
  910   
 912          """ 
 913          Perform a right outer join of C{self} and C{other}. 
 914   
 915          For each element (k, w) in C{other}, the resulting RDD will either 
 916          contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 
 917          if no elements in C{self} have key k. 
 918   
 919          Hash-partitions the resulting RDD into the given number of partitions. 
 920   
 921          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
 922          >>> y = sc.parallelize([("a", 2)]) 
 923          >>> sorted(y.rightOuterJoin(x).collect()) 
 924          [('a', (2, 1)), ('b', (None, 4))] 
 925          """ 
 926          return python_right_outer_join(self, other, numPartitions) 
  927   
 928       
 929 -    def partitionBy(self, numPartitions, partitionFunc=hash): 
  930          """ 
 931          Return a copy of the RDD partitioned using the specified partitioner. 
 932   
 933          >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 
 934          >>> sets = pairs.partitionBy(2).glom().collect() 
 935          >>> set(sets[0]).intersection(set(sets[1])) 
 936          set([]) 
 937          """ 
 938          if numPartitions is None: 
 939              numPartitions = self.ctx.defaultParallelism 
 940           
 941           
 942           
 943          outputSerializer = self.ctx._unbatched_serializer 
 944          def add_shuffle_key(split, iterator): 
 945   
 946              buckets = defaultdict(list) 
 947   
 948              for (k, v) in iterator: 
 949                  buckets[partitionFunc(k) % numPartitions].append((k, v)) 
 950              for (split, items) in buckets.iteritems(): 
 951                  yield pack_long(split) 
 952                  yield outputSerializer.dumps(items) 
  953          keyed = PipelinedRDD(self, add_shuffle_key) 
 954          keyed._bypass_serializer = True 
 955          with _JavaStackTrace(self.context) as st: 
 956              pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 
 957              partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 
 958                                                            id(partitionFunc)) 
 959          jrdd = pairRDD.partitionBy(partitioner).values() 
 960          rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 
 961           
 962           
 963          rdd._partitionFunc = partitionFunc 
 964          return rdd 
 965   
 966       
 967 -    def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 
 968                       numPartitions=None): 
  969          """ 
 970          Generic function to combine the elements for each key using a custom 
 971          set of aggregation functions. 
 972   
 973          Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 
 974          type" C.  Note that V and C can be different -- for example, one might 
 975          group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 
 976   
 977          Users provide three functions: 
 978   
 979              - C{createCombiner}, which turns a V into a C (e.g., creates 
 980                a one-element list) 
 981              - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 
 982                a list) 
 983              - C{mergeCombiners}, to combine two C's into a single one. 
 984   
 985          In addition, users can control the partitioning of the output RDD. 
 986   
 987          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 988          >>> def f(x): return x 
 989          >>> def add(a, b): return a + str(b) 
 990          >>> sorted(x.combineByKey(str, add, add).collect()) 
 991          [('a', '11'), ('b', '1')] 
 992          """ 
 993          if numPartitions is None: 
 994              numPartitions = self.ctx.defaultParallelism 
 995          def combineLocally(iterator): 
 996              combiners = {} 
 997              for x in iterator: 
 998                  (k, v) = x 
 999                  if k not in combiners: 
1000                      combiners[k] = createCombiner(v) 
1001                  else: 
1002                      combiners[k] = mergeValue(combiners[k], v) 
1003              return combiners.iteritems() 
 1004          locally_combined = self.mapPartitions(combineLocally) 
1005          shuffled = locally_combined.partitionBy(numPartitions) 
1006          def _mergeCombiners(iterator): 
1007              combiners = {} 
1008              for (k, v) in iterator: 
1009                  if not k in combiners: 
1010                      combiners[k] = v 
1011                  else: 
1012                      combiners[k] = mergeCombiners(combiners[k], v) 
1013              return combiners.iteritems() 
1014          return shuffled.mapPartitions(_mergeCombiners) 
1015       
1016 -    def foldByKey(self, zeroValue, func, numPartitions=None): 
 1017          """ 
1018          Merge the values for each key using an associative function "func" and a neutral "zeroValue" 
1019          which may be added to the result an arbitrary number of times, and must not change  
1020          the result (e.g., 0 for addition, or 1 for multiplication.).                 
1021   
1022          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1023          >>> from operator import add 
1024          >>> rdd.foldByKey(0, add).collect() 
1025          [('a', 2), ('b', 1)] 
1026          """ 
1027          return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions) 
 1028       
1029       
1030       
1032          """ 
1033          Group the values for each key in the RDD into a single sequence. 
1034          Hash-partitions the resulting RDD with into numPartitions partitions. 
1035   
1036          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1037          >>> sorted(x.groupByKey().collect()) 
1038          [('a', [1, 1]), ('b', [1])] 
1039          """ 
1040   
1041          def createCombiner(x): 
1042              return [x] 
 1043   
1044          def mergeValue(xs, x): 
1045              xs.append(x) 
1046              return xs 
1047   
1048          def mergeCombiners(a, b): 
1049              return a + b 
1050   
1051          return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 
1052                  numPartitions) 
1053   
1054       
1056          """ 
1057          Pass each value in the key-value pair RDD through a flatMap function 
1058          without changing the keys; this also retains the original RDD's 
1059          partitioning. 
1060          """ 
1061          flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 
1062          return self.flatMap(flat_map_fn, preservesPartitioning=True) 
 1063   
1065          """ 
1066          Pass each value in the key-value pair RDD through a map function 
1067          without changing the keys; this also retains the original RDD's 
1068          partitioning. 
1069          """ 
1070          map_values_fn = lambda (k, v): (k, f(v)) 
1071          return self.map(map_values_fn, preservesPartitioning=True) 
 1072   
1073       
1075          """ 
1076          Alias for cogroup. 
1077          """ 
1078          return self.cogroup(other) 
 1079   
1080       
1081 -    def cogroup(self, other, numPartitions=None): 
 1082          """ 
1083          For each key k in C{self} or C{other}, return a resulting RDD that 
1084          contains a tuple with the list of values for that key in C{self} as well 
1085          as C{other}. 
1086   
1087          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1088          >>> y = sc.parallelize([("a", 2)]) 
1089          >>> sorted(x.cogroup(y).collect()) 
1090          [('a', ([1], [2])), ('b', ([4], []))] 
1091          """ 
1092          return python_cogroup(self, other, numPartitions) 
 1093   
1095          """ 
1096          Return each (key, value) pair in C{self} that has no pair with matching key 
1097          in C{other}. 
1098   
1099          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 
1100          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1101          >>> sorted(x.subtractByKey(y).collect()) 
1102          [('b', 4), ('b', 5)] 
1103          """ 
1104          filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 
1105          map_func = lambda (key, vals): [(key, val) for val in vals[0]] 
1106          return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) 
 1107   
1108 -    def subtract(self, other, numPartitions=None): 
 1109          """ 
1110          Return each value in C{self} that is not contained in C{other}. 
1111   
1112          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 
1113          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1114          >>> sorted(x.subtract(y).collect()) 
1115          [('a', 1), ('b', 4), ('b', 5)] 
1116          """ 
1117          rdd = other.map(lambda x: (x, True))  
1118          return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])  
 1119   
1121          """ 
1122          Creates tuples of the elements in this RDD by applying C{f}. 
1123   
1124          >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 
1125          >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 
1126          >>> sorted(x.cogroup(y).collect()) 
1127          [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 
1128          """ 
1129          return self.map(lambda x: (f(x), x)) 
 1130   
1132          """ 
1133           Return a new RDD that has exactly numPartitions partitions. 
1134             
1135           Can increase or decrease the level of parallelism in this RDD. Internally, this uses 
1136           a shuffle to redistribute data. 
1137           If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 
1138           which can avoid performing a shuffle. 
1139           >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 
1140           >>> sorted(rdd.glom().collect()) 
1141           [[1], [2, 3], [4, 5], [6, 7]] 
1142           >>> len(rdd.repartition(2).glom().collect()) 
1143           2 
1144           >>> len(rdd.repartition(10).glom().collect()) 
1145           10 
1146          """ 
1147          jrdd = self._jrdd.repartition(numPartitions) 
1148          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1149   
1150 -    def coalesce(self, numPartitions, shuffle=False): 
 1151          """ 
1152          Return a new RDD that is reduced into `numPartitions` partitions. 
1153          >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 
1154          [[1], [2, 3], [4, 5]] 
1155          >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 
1156          [[1, 2, 3, 4, 5]] 
1157          """ 
1158          jrdd = self._jrdd.coalesce(numPartitions) 
1159          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1160   
1161 -    def zip(self, other): 
 1162          """ 
1163          Zips this RDD with another one, returning key-value pairs with the first element in each RDD 
1164          second element in each RDD, etc. Assumes that the two RDDs have the same number of 
1165          partitions and the same number of elements in each partition (e.g. one was made through 
1166          a map on the other). 
1167   
1168          >>> x = sc.parallelize(range(0,5)) 
1169          >>> y = sc.parallelize(range(1000, 1005)) 
1170          >>> x.zip(y).collect() 
1171          [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 
1172          """ 
1173          pairRDD = self._jrdd.zip(other._jrdd) 
1174          deserializer = PairDeserializer(self._jrdd_deserializer, 
1175                                               other._jrdd_deserializer) 
1176          return RDD(pairRDD, self.ctx, deserializer) 
 1177   
1179          """ 
1180          Return the name of this RDD. 
1181          """ 
1182          name_ = self._jrdd.name() 
1183          if not name_: 
1184              return None 
1185          return name_.encode('utf-8') 
 1186   
1188          """ 
1189          Assign a name to this RDD. 
1190          >>> rdd1 = sc.parallelize([1,2]) 
1191          >>> rdd1.setName('RDD1') 
1192          >>> rdd1.name() 
1193          'RDD1' 
1194          """ 
1195          self._jrdd.setName(name) 
 1196   
1198          """ 
1199          A description of this RDD and its recursive dependencies for debugging. 
1200          """ 
1201          debug_string = self._jrdd.toDebugString() 
1202          if not debug_string: 
1203              return None 
1204          return debug_string.encode('utf-8') 
 1205   
1207          """ 
1208          Get the RDD's current storage level. 
1209          >>> rdd1 = sc.parallelize([1,2]) 
1210          >>> rdd1.getStorageLevel() 
1211          StorageLevel(False, False, False, 1) 
1212          """ 
1213          java_storage_level = self._jrdd.getStorageLevel() 
1214          storage_level = StorageLevel(java_storage_level.useDisk(), 
1215                                       java_storage_level.useMemory(), 
1216                                       java_storage_level.deserialized(), 
1217                                       java_storage_level.replication()) 
1218          return storage_level 
 1219   
1227      """ 
1228      Pipelined maps: 
1229      >>> rdd = sc.parallelize([1, 2, 3, 4]) 
1230      >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 
1231      [4, 8, 12, 16] 
1232      >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 
1233      [4, 8, 12, 16] 
1234   
1235      Pipelined reduces: 
1236      >>> from operator import add 
1237      >>> rdd.map(lambda x: 2 * x).reduce(add) 
1238      20 
1239      >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 
1240      20 
1241      """ 
1242 -    def __init__(self, prev, func, preservesPartitioning=False): 
 1243          if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 
1244               
1245              self.func = func 
1246              self.preservesPartitioning = preservesPartitioning 
1247              self._prev_jrdd = prev._jrdd 
1248              self._prev_jrdd_deserializer = prev._jrdd_deserializer 
1249          else: 
1250              prev_func = prev.func 
1251              def pipeline_func(split, iterator): 
1252                  return func(split, prev_func(split, iterator)) 
 1253              self.func = pipeline_func 
1254              self.preservesPartitioning = \ 
1255                  prev.preservesPartitioning and preservesPartitioning 
1256              self._prev_jrdd = prev._prev_jrdd   
1257              self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 
1258          self.is_cached = False 
1259          self.is_checkpointed = False 
1260          self.ctx = prev.ctx 
1261          self.prev = prev 
1262          self._jrdd_val = None 
1263          self._jrdd_deserializer = self.ctx.serializer 
1264          self._bypass_serializer = False 
 1265   
1266      @property 
1268          if self._jrdd_val: 
1269              return self._jrdd_val 
1270          if self._bypass_serializer: 
1271              serializer = NoOpSerializer() 
1272          else: 
1273              serializer = self.ctx.serializer 
1274          command = (self.func, self._prev_jrdd_deserializer, serializer) 
1275          pickled_command = CloudPickleSerializer().dumps(command) 
1276          broadcast_vars = ListConverter().convert( 
1277              [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 
1278              self.ctx._gateway._gateway_client) 
1279          self.ctx._pickled_broadcast_vars.clear() 
1280          class_tag = self._prev_jrdd.classTag() 
1281          env = MapConverter().convert(self.ctx.environment, 
1282                                       self.ctx._gateway._gateway_client) 
1283          includes = ListConverter().convert(self.ctx._python_includes, 
1284                                       self.ctx._gateway._gateway_client) 
1285          python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 
1286              bytearray(pickled_command), env, includes, self.preservesPartitioning, 
1287              self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 
1288              class_tag) 
1289          self._jrdd_val = python_rdd.asJavaRDD() 
1290          return self._jrdd_val 
 1291   
1293          return not (self.is_cached or self.is_checkpointed) 
 1294   
1297      import doctest 
1298      from pyspark.context import SparkContext 
1299      globs = globals().copy() 
1300       
1301       
1302      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
1303      (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 
1304      globs['sc'].stop() 
1305      if failure_count: 
1306          exit(-1) 
 1307   
1308   
1309  if __name__ == "__main__": 
1310      _test() 
1311