1   
   2   
   3   
   4   
   5   
   6   
   7   
   8   
   9   
  10   
  11   
  12   
  13   
  14   
  15   
  16   
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from itertools import chain, ifilter, imap 
  22  import operator 
  23  import os 
  24  import sys 
  25  import shlex 
  26  import traceback 
  27  from subprocess import Popen, PIPE 
  28  from tempfile import NamedTemporaryFile 
  29  from threading import Thread 
  30  import warnings 
  31  import heapq 
  32   
  33  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  34      BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long 
  35  from pyspark.join import python_join, python_left_outer_join, \ 
  36      python_right_outer_join, python_cogroup 
  37  from pyspark.statcounter import StatCounter 
  38  from pyspark.rddsampler import RDDSampler 
  39  from pyspark.storagelevel import StorageLevel 
  40   
  41  from py4j.java_collections import ListConverter, MapConverter 
  42   
  43  __all__ = ["RDD"] 
  47      tb = traceback.extract_stack() 
  48      if len(tb) == 0: 
  49          return "I'm lost!" 
  50       
  51       
  52       
  53      file, line, module, what = tb[len(tb) - 1] 
  54      sparkpath = os.path.dirname(file) 
  55      first_spark_frame = len(tb) - 1 
  56      for i in range(0, len(tb)): 
  57          file, line, fun, what = tb[i] 
  58          if file.startswith(sparkpath): 
  59              first_spark_frame = i 
  60              break 
  61      if first_spark_frame == 0: 
  62          file, line, fun, what = tb[0] 
  63          return "%s at %s:%d" % (fun, file, line) 
  64      sfile, sline, sfun, swhat = tb[first_spark_frame] 
  65      ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 
  66      return "%s at %s:%d" % (sfun, ufile, uline) 
   67   
  68  _spark_stack_depth = 0 
  72          self._traceback = _extract_concise_traceback() 
  73          self._context = sc 
   74   
  80   
   86   
  88      """ 
  89      An implementation of MaxHeap. 
  90      >>> import pyspark.rdd 
  91      >>> heap = pyspark.rdd.MaxHeapQ(5) 
  92      >>> [heap.insert(i) for i in range(10)] 
  93      [None, None, None, None, None, None, None, None, None, None] 
  94      >>> sorted(heap.getElements()) 
  95      [0, 1, 2, 3, 4] 
  96      >>> heap = pyspark.rdd.MaxHeapQ(5) 
  97      >>> [heap.insert(i) for i in range(9, -1, -1)] 
  98      [None, None, None, None, None, None, None, None, None, None] 
  99      >>> sorted(heap.getElements()) 
 100      [0, 1, 2, 3, 4] 
 101      >>> heap = pyspark.rdd.MaxHeapQ(1) 
 102      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 103      [None, None, None, None, None, None, None, None, None, None] 
 104      >>> heap.getElements() 
 105      [0] 
 106      """ 
 107   
 109           
 110          self.q = [0] 
 111          self.maxsize = maxsize 
  112   
 114          while (k > 1) and (self.q[k/2] < self.q[k]): 
 115              self._swap(k, k/2) 
 116              k = k/2 
  117   
 119          t = self.q[i] 
 120          self.q[i] = self.q[j] 
 121          self.q[j] = t 
  122   
 124          N = self.size() 
 125          while 2 * k <= N: 
 126              j = 2 * k 
 127               
 128               
 129              if j < N and self.q[j] < self.q[j + 1]: 
 130                  j = j + 1 
 131              if(self.q[k] > self.q[j]): 
 132                  break 
 133              self._swap(k, j) 
 134              k = j 
  135   
 137          return len(self.q) - 1 
  138   
 140          if (self.size()) < self.maxsize: 
 141              self.q.append(value) 
 142              self._swim(self.size()) 
 143          else: 
 144              self._replaceRoot(value) 
  145   
 148   
 150          if(self.q[1] > value): 
 151              self.q[1] = value 
 152              self._sink(1) 
   153   
 155      """ 
 156      A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
 157      Represents an immutable, partitioned collection of elements that can be 
 158      operated on in parallel. 
 159      """ 
 160   
 161 -    def __init__(self, jrdd, ctx, jrdd_deserializer): 
  162          self._jrdd = jrdd 
 163          self.is_cached = False 
 164          self.is_checkpointed = False 
 165          self.ctx = ctx 
 166          self._jrdd_deserializer = jrdd_deserializer 
  167   
 169          return self._jrdd.toString() 
  170   
 171      @property 
 173          """ 
 174          The L{SparkContext} that this RDD was created on. 
 175          """ 
 176          return self.ctx 
  177   
 179          """ 
 180          Persist this RDD with the default storage level (C{MEMORY_ONLY}). 
 181          """ 
 182          self.is_cached = True 
 183          self._jrdd.cache() 
 184          return self 
  185   
 187          """ 
 188          Set this RDD's storage level to persist its values across operations after the first time 
 189          it is computed. This can only be used to assign a new storage level if the RDD does not 
 190          have a storage level set yet. 
 191          """ 
 192          self.is_cached = True 
 193          javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 
 194          self._jrdd.persist(javaStorageLevel) 
 195          return self 
  196   
 198          """ 
 199          Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 
 200          """ 
 201          self.is_cached = False 
 202          self._jrdd.unpersist() 
 203          return self 
  204   
 206          """ 
 207          Mark this RDD for checkpointing. It will be saved to a file inside the 
 208          checkpoint directory set with L{SparkContext.setCheckpointDir()} and 
 209          all references to its parent RDDs will be removed. This function must 
 210          be called before any job has been executed on this RDD. It is strongly 
 211          recommended that this RDD is persisted in memory, otherwise saving it 
 212          on a file will require recomputation. 
 213          """ 
 214          self.is_checkpointed = True 
 215          self._jrdd.rdd().checkpoint() 
  216   
 218          """ 
 219          Return whether this RDD has been checkpointed or not 
 220          """ 
 221          return self._jrdd.rdd().isCheckpointed() 
  222   
 224          """ 
 225          Gets the name of the file to which this RDD was checkpointed 
 226          """ 
 227          checkpointFile = self._jrdd.rdd().getCheckpointFile() 
 228          if checkpointFile.isDefined(): 
 229              return checkpointFile.get() 
 230          else: 
 231              return None 
  232   
 233 -    def map(self, f, preservesPartitioning=False): 
  234          """ 
 235          Return a new RDD by applying a function to each element of this RDD. 
 236          """ 
 237          def func(split, iterator): return imap(f, iterator) 
 238          return PipelinedRDD(self, func, preservesPartitioning) 
  239   
 240 -    def flatMap(self, f, preservesPartitioning=False): 
  241          """ 
 242          Return a new RDD by first applying a function to all elements of this 
 243          RDD, and then flattening the results. 
 244   
 245          >>> rdd = sc.parallelize([2, 3, 4]) 
 246          >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 
 247          [1, 1, 1, 2, 2, 3] 
 248          >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 
 249          [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 
 250          """ 
 251          def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 
 252          return self.mapPartitionsWithIndex(func, preservesPartitioning) 
  253   
 255          """ 
 256          Return a new RDD by applying a function to each partition of this RDD. 
 257   
 258          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 259          >>> def f(iterator): yield sum(iterator) 
 260          >>> rdd.mapPartitions(f).collect() 
 261          [3, 7] 
 262          """ 
 263          def func(s, iterator): return f(iterator) 
 264          return self.mapPartitionsWithIndex(func) 
  265   
 267          """ 
 268          Return a new RDD by applying a function to each partition of this RDD, 
 269          while tracking the index of the original partition. 
 270   
 271          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 272          >>> def f(splitIndex, iterator): yield splitIndex 
 273          >>> rdd.mapPartitionsWithIndex(f).sum() 
 274          6 
 275          """ 
 276          return PipelinedRDD(self, f, preservesPartitioning) 
  277   
 279          """ 
 280          Deprecated: use mapPartitionsWithIndex instead. 
 281   
 282          Return a new RDD by applying a function to each partition of this RDD, 
 283          while tracking the index of the original partition. 
 284   
 285          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 286          >>> def f(splitIndex, iterator): yield splitIndex 
 287          >>> rdd.mapPartitionsWithSplit(f).sum() 
 288          6 
 289          """ 
 290          warnings.warn("mapPartitionsWithSplit is deprecated; " 
 291              "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 
 292          return self.mapPartitionsWithIndex(f, preservesPartitioning) 
  293   
 295          """ 
 296          Return a new RDD containing only the elements that satisfy a predicate. 
 297   
 298          >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 
 299          >>> rdd.filter(lambda x: x % 2 == 0).collect() 
 300          [2, 4] 
 301          """ 
 302          def func(iterator): return ifilter(f, iterator) 
 303          return self.mapPartitions(func) 
  304   
 306          """ 
 307          Return a new RDD containing the distinct elements in this RDD. 
 308   
 309          >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 
 310          [1, 2, 3] 
 311          """ 
 312          return self.map(lambda x: (x, None)) \ 
 313                     .reduceByKey(lambda x, _: x) \ 
 314                     .map(lambda (x, _): x) 
  315   
 316 -    def sample(self, withReplacement, fraction, seed): 
  317          """ 
 318          Return a sampled subset of this RDD (relies on numpy and falls back 
 319          on default random generator if numpy is unavailable). 
 320   
 321          >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 
 322          [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 
 323          """ 
 324          assert fraction >= 0.0, "Invalid fraction value: %s" % fraction 
 325          return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) 
  326   
 327       
 328 -    def takeSample(self, withReplacement, num, seed): 
  329          """ 
 330          Return a fixed-size sampled subset of this RDD (currently requires numpy). 
 331   
 332          >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 
 333          [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 
 334          """ 
 335   
 336          fraction = 0.0 
 337          total = 0 
 338          multiplier = 3.0 
 339          initialCount = self.count() 
 340          maxSelected = 0 
 341   
 342          if (num < 0): 
 343              raise ValueError 
 344   
 345          if (initialCount == 0): 
 346              return list() 
 347   
 348          if initialCount > sys.maxint - 1: 
 349              maxSelected = sys.maxint - 1 
 350          else: 
 351              maxSelected = initialCount 
 352   
 353          if num > initialCount and not withReplacement: 
 354              total = maxSelected 
 355              fraction = multiplier * (maxSelected + 1) / initialCount 
 356          else: 
 357              fraction = multiplier * (num + 1) / initialCount 
 358              total = num 
 359   
 360          samples = self.sample(withReplacement, fraction, seed).collect() 
 361   
 362           
 363           
 364           
 365          while len(samples) < total: 
 366              if seed > sys.maxint - 2: 
 367                  seed = -1 
 368              seed += 1 
 369              samples = self.sample(withReplacement, fraction, seed).collect() 
 370   
 371          sampler = RDDSampler(withReplacement, fraction, seed+1) 
 372          sampler.shuffle(samples) 
 373          return samples[0:total] 
  374   
 376          """ 
 377          Return the union of this RDD and another one. 
 378   
 379          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 380          >>> rdd.union(rdd).collect() 
 381          [1, 1, 2, 3, 1, 1, 2, 3] 
 382          """ 
 383          if self._jrdd_deserializer == other._jrdd_deserializer: 
 384              rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 
 385                        self._jrdd_deserializer) 
 386              return rdd 
 387          else: 
 388               
 389               
 390              self_copy = self._reserialize() 
 391              other_copy = other._reserialize() 
 392              return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 
 393                         self.ctx.serializer) 
  394   
 396          if self._jrdd_deserializer == self.ctx.serializer: 
 397              return self 
 398          else: 
 399              return self.map(lambda x: x, preservesPartitioning=True) 
  400   
 402          """ 
 403          Return the union of this RDD and another one. 
 404   
 405          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 406          >>> (rdd + rdd).collect() 
 407          [1, 1, 2, 3, 1, 1, 2, 3] 
 408          """ 
 409          if not isinstance(other, RDD): 
 410              raise TypeError 
 411          return self.union(other) 
  412   
 413 -    def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x): 
  414          """ 
 415          Sorts this RDD, which is assumed to consist of (key, value) pairs. 
 416   
 417          >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 418          >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 
 419          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 420          >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 
 421          >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 
 422          >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 
 423          [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 
 424          """ 
 425          if numPartitions is None: 
 426              numPartitions = self.ctx.defaultParallelism 
 427   
 428          bounds = list() 
 429   
 430           
 431           
 432           
 433          if numPartitions > 1: 
 434              rddSize = self.count() 
 435              maxSampleSize = numPartitions * 20.0  
 436              fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 
 437   
 438              samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 
 439              samples = sorted(samples, reverse=(not ascending), key=keyfunc) 
 440   
 441               
 442               
 443              for i in range(0, numPartitions - 1): 
 444                  index = (len(samples) - 1) * (i + 1) / numPartitions 
 445                  bounds.append(samples[index]) 
 446   
 447          def rangePartitionFunc(k): 
 448              p = 0 
 449              while p < len(bounds) and keyfunc(k) > bounds[p]: 
 450                  p += 1 
 451              if ascending: 
 452                  return p 
 453              else: 
 454                  return numPartitions-1-p 
  455   
 456          def mapFunc(iterator): 
 457              yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k)) 
  458   
 459          return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 
 460                      .mapPartitions(mapFunc,preservesPartitioning=True) 
 461                      .flatMap(lambda x: x, preservesPartitioning=True)) 
 462   
 464          """ 
 465          Return an RDD created by coalescing all elements within each partition 
 466          into a list. 
 467   
 468          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 469          >>> sorted(rdd.glom().collect()) 
 470          [[1, 2], [3, 4]] 
 471          """ 
 472          def func(iterator): yield list(iterator) 
 473          return self.mapPartitions(func) 
  474   
 476          """ 
 477          Return the Cartesian product of this RDD and another one, that is, the 
 478          RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 
 479          C{b} is in C{other}. 
 480   
 481          >>> rdd = sc.parallelize([1, 2]) 
 482          >>> sorted(rdd.cartesian(rdd).collect()) 
 483          [(1, 1), (1, 2), (2, 1), (2, 2)] 
 484          """ 
 485           
 486          deserializer = CartesianDeserializer(self._jrdd_deserializer, 
 487                                               other._jrdd_deserializer) 
 488          return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer) 
  489   
 490 -    def groupBy(self, f, numPartitions=None): 
  491          """ 
 492          Return an RDD of grouped items. 
 493   
 494          >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 
 495          >>> result = rdd.groupBy(lambda x: x % 2).collect() 
 496          >>> sorted([(x, sorted(y)) for (x, y) in result]) 
 497          [(0, [2, 8]), (1, [1, 1, 3, 5])] 
 498          """ 
 499          return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) 
  500   
 501 -    def pipe(self, command, env={}): 
  502          """ 
 503          Return an RDD created by piping elements to a forked external process. 
 504   
 505          >>> sc.parallelize([1, 2, 3]).pipe('cat').collect() 
 506          ['1', '2', '3'] 
 507          """ 
 508          def func(iterator): 
 509              pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
 510              def pipe_objs(out): 
 511                  for obj in iterator: 
 512                      out.write(str(obj).rstrip('\n') + '\n') 
 513                  out.close() 
  514              Thread(target=pipe_objs, args=[pipe.stdin]).start() 
 515              return (x.rstrip('\n') for x in pipe.stdout) 
 516          return self.mapPartitions(func) 
 517   
 519          """ 
 520          Applies a function to all elements of this RDD. 
 521   
 522          >>> def f(x): print x 
 523          >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 
 524          """ 
 525          def processPartition(iterator): 
 526              for x in iterator: 
 527                  f(x) 
 528              yield None 
  529          self.mapPartitions(processPartition).collect()   
 530   
 532          """ 
 533          Return a list that contains all of the elements in this RDD. 
 534          """ 
 535          with _JavaStackTrace(self.context) as st: 
 536            bytesInJava = self._jrdd.collect().iterator() 
 537          return list(self._collect_iterator_through_file(bytesInJava)) 
  538   
 540           
 541           
 542           
 543          tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 
 544          tempFile.close() 
 545          self.ctx._writeToFile(iterator, tempFile.name) 
 546           
 547          with open(tempFile.name, 'rb') as tempFile: 
 548              for item in self._jrdd_deserializer.load_stream(tempFile): 
 549                  yield item 
 550          os.unlink(tempFile.name) 
  551   
 553          """ 
 554          Reduces the elements of this RDD using the specified commutative and 
 555          associative binary operator. 
 556   
 557          >>> from operator import add 
 558          >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 
 559          15 
 560          >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 
 561          10 
 562          """ 
 563          def func(iterator): 
 564              acc = None 
 565              for obj in iterator: 
 566                  if acc is None: 
 567                      acc = obj 
 568                  else: 
 569                      acc = f(obj, acc) 
 570              if acc is not None: 
 571                  yield acc 
  572          vals = self.mapPartitions(func).collect() 
 573          return reduce(f, vals) 
 574   
 575 -    def fold(self, zeroValue, op): 
  576          """ 
 577          Aggregate the elements of each partition, and then the results for all 
 578          the partitions, using a given associative function and a neutral "zero 
 579          value." 
 580   
 581          The function C{op(t1, t2)} is allowed to modify C{t1} and return it 
 582          as its result value to avoid object allocation; however, it should not 
 583          modify C{t2}. 
 584   
 585          >>> from operator import add 
 586          >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 
 587          15 
 588          """ 
 589          def func(iterator): 
 590              acc = zeroValue 
 591              for obj in iterator: 
 592                  acc = op(obj, acc) 
 593              yield acc 
  594          vals = self.mapPartitions(func).collect() 
 595          return reduce(op, vals, zeroValue) 
 596   
 597       
 598   
 600          """ 
 601          Add up the elements in this RDD. 
 602   
 603          >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 
 604          6.0 
 605          """ 
 606          return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 
  607   
 609          """ 
 610          Return the number of elements in this RDD. 
 611   
 612          >>> sc.parallelize([2, 3, 4]).count() 
 613          3 
 614          """ 
 615          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
  616   
 618          """ 
 619          Return a L{StatCounter} object that captures the mean, variance 
 620          and count of the RDD's elements in one operation. 
 621          """ 
 622          def redFunc(left_counter, right_counter): 
 623              return left_counter.mergeStats(right_counter) 
  624   
 625          return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 
 626   
 628          """ 
 629          Compute the mean of this RDD's elements. 
 630   
 631          >>> sc.parallelize([1, 2, 3]).mean() 
 632          2.0 
 633          """ 
 634          return self.stats().mean() 
  635   
 637          """ 
 638          Compute the variance of this RDD's elements. 
 639   
 640          >>> sc.parallelize([1, 2, 3]).variance() 
 641          0.666... 
 642          """ 
 643          return self.stats().variance() 
  644   
 646          """ 
 647          Compute the standard deviation of this RDD's elements. 
 648   
 649          >>> sc.parallelize([1, 2, 3]).stdev() 
 650          0.816... 
 651          """ 
 652          return self.stats().stdev() 
  653   
 655          """ 
 656          Compute the sample standard deviation of this RDD's elements (which corrects for bias in 
 657          estimating the standard deviation by dividing by N-1 instead of N). 
 658   
 659          >>> sc.parallelize([1, 2, 3]).sampleStdev() 
 660          1.0 
 661          """ 
 662          return self.stats().sampleStdev() 
  663   
 665          """ 
 666          Compute the sample variance of this RDD's elements (which corrects for bias in 
 667          estimating the variance by dividing by N-1 instead of N). 
 668   
 669          >>> sc.parallelize([1, 2, 3]).sampleVariance() 
 670          1.0 
 671          """ 
 672          return self.stats().sampleVariance() 
  673   
 675          """ 
 676          Return the count of each unique value in this RDD as a dictionary of 
 677          (value, count) pairs. 
 678   
 679          >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 
 680          [(1, 2), (2, 3)] 
 681          """ 
 682          def countPartition(iterator): 
 683              counts = defaultdict(int) 
 684              for obj in iterator: 
 685                  counts[obj] += 1 
 686              yield counts 
  687          def mergeMaps(m1, m2): 
 688              for (k, v) in m2.iteritems(): 
 689                  m1[k] += v 
 690              return m1 
 691          return self.mapPartitions(countPartition).reduce(mergeMaps) 
 692       
 693 -    def top(self, num): 
  694          """ 
 695          Get the top N elements from a RDD. 
 696   
 697          Note: It returns the list sorted in descending order. 
 698          >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 
 699          [12] 
 700          >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2) 
 701          [6, 5] 
 702          """ 
 703          def topIterator(iterator): 
 704              q = [] 
 705              for k in iterator: 
 706                  if len(q) < num: 
 707                      heapq.heappush(q, k) 
 708                  else: 
 709                      heapq.heappushpop(q, k) 
 710              yield q 
  711   
 712          def merge(a, b): 
 713              return next(topIterator(a + b)) 
 714   
 715          return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 
 716   
 718          """ 
 719          Get the N elements from a RDD ordered in ascending order or as specified 
 720          by the optional key function.  
 721   
 722          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 
 723          [1, 2, 3, 4, 5, 6] 
 724          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 
 725          [10, 9, 7, 6, 5, 4] 
 726          """ 
 727   
 728          def topNKeyedElems(iterator, key_=None): 
 729              q = MaxHeapQ(num) 
 730              for k in iterator: 
 731                  if key_ != None: 
 732                      k = (key_(k), k) 
 733                  q.insert(k) 
 734              yield q.getElements() 
  735   
 736          def unKey(x, key_=None): 
 737              if key_ != None: 
 738                  x = [i[1] for i in x] 
 739              return x 
 740           
 741          def merge(a, b): 
 742              return next(topNKeyedElems(a + b)) 
 743          result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) 
 744          return sorted(unKey(result, key), key=key) 
 745   
 746   
 747 -    def take(self, num): 
  748          """ 
 749          Take the first num elements of the RDD. 
 750   
 751          This currently scans the partitions *one by one*, so it will be slow if 
 752          a lot of partitions are required. In that case, use L{collect} to get 
 753          the whole RDD instead. 
 754   
 755          >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 
 756          [2, 3] 
 757          >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 
 758          [2, 3, 4, 5, 6] 
 759          """ 
 760          def takeUpToNum(iterator): 
 761              taken = 0 
 762              while taken < num: 
 763                  yield next(iterator) 
 764                  taken += 1 
  765           
 766          mapped = self.mapPartitions(takeUpToNum) 
 767          items = [] 
 768           
 769           
 770           
 771          with _JavaStackTrace(self.context) as st: 
 772              for partition in range(mapped._jrdd.splits().size()): 
 773                  partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 
 774                  partitionsToTake[0] = partition 
 775                  iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 
 776                  items.extend(mapped._collect_iterator_through_file(iterator)) 
 777                  if len(items) >= num: 
 778                      break 
 779          return items[:num] 
 780   
 782          """ 
 783          Return the first element in this RDD. 
 784   
 785          >>> sc.parallelize([2, 3, 4]).first() 
 786          2 
 787          """ 
 788          return self.take(1)[0] 
  789   
 790 -    def saveAsTextFile(self, path): 
  791          """ 
 792          Save this RDD as a text file, using string representations of elements. 
 793   
 794          >>> tempFile = NamedTemporaryFile(delete=True) 
 795          >>> tempFile.close() 
 796          >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 
 797          >>> from fileinput import input 
 798          >>> from glob import glob 
 799          >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 
 800          '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 
 801          """ 
 802          def func(split, iterator): 
 803              for x in iterator: 
 804                  if not isinstance(x, basestring): 
 805                      x = unicode(x) 
 806                  yield x.encode("utf-8") 
  807          keyed = PipelinedRDD(self, func) 
 808          keyed._bypass_serializer = True 
 809          keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 
 810   
 811       
 812   
 814          """ 
 815          Return the key-value pairs in this RDD to the master as a dictionary. 
 816   
 817          >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 
 818          >>> m[1] 
 819          2 
 820          >>> m[3] 
 821          4 
 822          """ 
 823          return dict(self.collect()) 
  824   
 826          """ 
 827          Merge the values for each key using an associative reduce function. 
 828   
 829          This will also perform the merging locally on each mapper before 
 830          sending results to a reducer, similarly to a "combiner" in MapReduce. 
 831   
 832          Output will be hash-partitioned with C{numPartitions} partitions, or 
 833          the default parallelism level if C{numPartitions} is not specified. 
 834   
 835          >>> from operator import add 
 836          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 837          >>> sorted(rdd.reduceByKey(add).collect()) 
 838          [('a', 2), ('b', 1)] 
 839          """ 
 840          return self.combineByKey(lambda x: x, func, func, numPartitions) 
  841   
 843          """ 
 844          Merge the values for each key using an associative reduce function, but 
 845          return the results immediately to the master as a dictionary. 
 846   
 847          This will also perform the merging locally on each mapper before 
 848          sending results to a reducer, similarly to a "combiner" in MapReduce. 
 849   
 850          >>> from operator import add 
 851          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 852          >>> sorted(rdd.reduceByKeyLocally(add).items()) 
 853          [('a', 2), ('b', 1)] 
 854          """ 
 855          def reducePartition(iterator): 
 856              m = {} 
 857              for (k, v) in iterator: 
 858                  m[k] = v if k not in m else func(m[k], v) 
 859              yield m 
  860          def mergeMaps(m1, m2): 
 861              for (k, v) in m2.iteritems(): 
 862                  m1[k] = v if k not in m1 else func(m1[k], v) 
 863              return m1 
 864          return self.mapPartitions(reducePartition).reduce(mergeMaps) 
 865   
 867          """ 
 868          Count the number of elements for each key, and return the result to the 
 869          master as a dictionary. 
 870   
 871          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 872          >>> sorted(rdd.countByKey().items()) 
 873          [('a', 2), ('b', 1)] 
 874          """ 
 875          return self.map(lambda x: x[0]).countByValue() 
  876   
 877 -    def join(self, other, numPartitions=None): 
  878          """ 
 879          Return an RDD containing all pairs of elements with matching keys in 
 880          C{self} and C{other}. 
 881   
 882          Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 
 883          (k, v1) is in C{self} and (k, v2) is in C{other}. 
 884   
 885          Performs a hash join across the cluster. 
 886   
 887          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
 888          >>> y = sc.parallelize([("a", 2), ("a", 3)]) 
 889          >>> sorted(x.join(y).collect()) 
 890          [('a', (1, 2)), ('a', (1, 3))] 
 891          """ 
 892          return python_join(self, other, numPartitions) 
  893   
 895          """ 
 896          Perform a left outer join of C{self} and C{other}. 
 897   
 898          For each element (k, v) in C{self}, the resulting RDD will either 
 899          contain all pairs (k, (v, w)) for w in C{other}, or the pair 
 900          (k, (v, None)) if no elements in other have key k. 
 901   
 902          Hash-partitions the resulting RDD into the given number of partitions. 
 903   
 904          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
 905          >>> y = sc.parallelize([("a", 2)]) 
 906          >>> sorted(x.leftOuterJoin(y).collect()) 
 907          [('a', (1, 2)), ('b', (4, None))] 
 908          """ 
 909          return python_left_outer_join(self, other, numPartitions) 
  910   
 912          """ 
 913          Perform a right outer join of C{self} and C{other}. 
 914   
 915          For each element (k, w) in C{other}, the resulting RDD will either 
 916          contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 
 917          if no elements in C{self} have key k. 
 918   
 919          Hash-partitions the resulting RDD into the given number of partitions. 
 920   
 921          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
 922          >>> y = sc.parallelize([("a", 2)]) 
 923          >>> sorted(y.rightOuterJoin(x).collect()) 
 924          [('a', (2, 1)), ('b', (None, 4))] 
 925          """ 
 926          return python_right_outer_join(self, other, numPartitions) 
  927   
 928       
 929 -    def partitionBy(self, numPartitions, partitionFunc=None): 
  930          """ 
 931          Return a copy of the RDD partitioned using the specified partitioner. 
 932   
 933          >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 
 934          >>> sets = pairs.partitionBy(2).glom().collect() 
 935          >>> set(sets[0]).intersection(set(sets[1])) 
 936          set([]) 
 937          """ 
 938          if numPartitions is None: 
 939              numPartitions = self.ctx.defaultParallelism 
 940   
 941          if partitionFunc is None: 
 942              partitionFunc = lambda x: 0 if x is None else hash(x) 
 943           
 944           
 945           
 946          outputSerializer = self.ctx._unbatched_serializer 
 947          def add_shuffle_key(split, iterator): 
 948   
 949              buckets = defaultdict(list) 
 950   
 951              for (k, v) in iterator: 
 952                  buckets[partitionFunc(k) % numPartitions].append((k, v)) 
 953              for (split, items) in buckets.iteritems(): 
 954                  yield pack_long(split) 
 955                  yield outputSerializer.dumps(items) 
  956          keyed = PipelinedRDD(self, add_shuffle_key) 
 957          keyed._bypass_serializer = True 
 958          with _JavaStackTrace(self.context) as st: 
 959              pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 
 960              partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 
 961                                                            id(partitionFunc)) 
 962          jrdd = pairRDD.partitionBy(partitioner).values() 
 963          rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 
 964           
 965           
 966          rdd._partitionFunc = partitionFunc 
 967          return rdd 
 968   
 969       
 970 -    def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 
 971                       numPartitions=None): 
  972          """ 
 973          Generic function to combine the elements for each key using a custom 
 974          set of aggregation functions. 
 975   
 976          Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 
 977          type" C.  Note that V and C can be different -- for example, one might 
 978          group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 
 979   
 980          Users provide three functions: 
 981   
 982              - C{createCombiner}, which turns a V into a C (e.g., creates 
 983                a one-element list) 
 984              - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 
 985                a list) 
 986              - C{mergeCombiners}, to combine two C's into a single one. 
 987   
 988          In addition, users can control the partitioning of the output RDD. 
 989   
 990          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 991          >>> def f(x): return x 
 992          >>> def add(a, b): return a + str(b) 
 993          >>> sorted(x.combineByKey(str, add, add).collect()) 
 994          [('a', '11'), ('b', '1')] 
 995          """ 
 996          if numPartitions is None: 
 997              numPartitions = self.ctx.defaultParallelism 
 998          def combineLocally(iterator): 
 999              combiners = {} 
1000              for x in iterator: 
1001                  (k, v) = x 
1002                  if k not in combiners: 
1003                      combiners[k] = createCombiner(v) 
1004                  else: 
1005                      combiners[k] = mergeValue(combiners[k], v) 
1006              return combiners.iteritems() 
 1007          locally_combined = self.mapPartitions(combineLocally) 
1008          shuffled = locally_combined.partitionBy(numPartitions) 
1009          def _mergeCombiners(iterator): 
1010              combiners = {} 
1011              for (k, v) in iterator: 
1012                  if not k in combiners: 
1013                      combiners[k] = v 
1014                  else: 
1015                      combiners[k] = mergeCombiners(combiners[k], v) 
1016              return combiners.iteritems() 
1017          return shuffled.mapPartitions(_mergeCombiners) 
1018       
1019 -    def foldByKey(self, zeroValue, func, numPartitions=None): 
 1020          """ 
1021          Merge the values for each key using an associative function "func" and a neutral "zeroValue" 
1022          which may be added to the result an arbitrary number of times, and must not change  
1023          the result (e.g., 0 for addition, or 1 for multiplication.).                 
1024   
1025          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1026          >>> from operator import add 
1027          >>> rdd.foldByKey(0, add).collect() 
1028          [('a', 2), ('b', 1)] 
1029          """ 
1030          return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions) 
 1031       
1032       
1033       
1035          """ 
1036          Group the values for each key in the RDD into a single sequence. 
1037          Hash-partitions the resulting RDD with into numPartitions partitions. 
1038   
1039          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1040          >>> sorted(x.groupByKey().collect()) 
1041          [('a', [1, 1]), ('b', [1])] 
1042          """ 
1043   
1044          def createCombiner(x): 
1045              return [x] 
 1046   
1047          def mergeValue(xs, x): 
1048              xs.append(x) 
1049              return xs 
1050   
1051          def mergeCombiners(a, b): 
1052              return a + b 
1053   
1054          return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 
1055                  numPartitions) 
1056   
1057       
1059          """ 
1060          Pass each value in the key-value pair RDD through a flatMap function 
1061          without changing the keys; this also retains the original RDD's 
1062          partitioning. 
1063          """ 
1064          flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 
1065          return self.flatMap(flat_map_fn, preservesPartitioning=True) 
 1066   
1068          """ 
1069          Pass each value in the key-value pair RDD through a map function 
1070          without changing the keys; this also retains the original RDD's 
1071          partitioning. 
1072          """ 
1073          map_values_fn = lambda (k, v): (k, f(v)) 
1074          return self.map(map_values_fn, preservesPartitioning=True) 
 1075   
1076       
1078          """ 
1079          Alias for cogroup. 
1080          """ 
1081          return self.cogroup(other) 
 1082   
1083       
1084 -    def cogroup(self, other, numPartitions=None): 
 1085          """ 
1086          For each key k in C{self} or C{other}, return a resulting RDD that 
1087          contains a tuple with the list of values for that key in C{self} as well 
1088          as C{other}. 
1089   
1090          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1091          >>> y = sc.parallelize([("a", 2)]) 
1092          >>> sorted(x.cogroup(y).collect()) 
1093          [('a', ([1], [2])), ('b', ([4], []))] 
1094          """ 
1095          return python_cogroup(self, other, numPartitions) 
 1096   
1098          """ 
1099          Return each (key, value) pair in C{self} that has no pair with matching key 
1100          in C{other}. 
1101   
1102          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 
1103          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1104          >>> sorted(x.subtractByKey(y).collect()) 
1105          [('b', 4), ('b', 5)] 
1106          """ 
1107          filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 
1108          map_func = lambda (key, vals): [(key, val) for val in vals[0]] 
1109          return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) 
 1110   
1111 -    def subtract(self, other, numPartitions=None): 
 1112          """ 
1113          Return each value in C{self} that is not contained in C{other}. 
1114   
1115          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 
1116          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1117          >>> sorted(x.subtract(y).collect()) 
1118          [('a', 1), ('b', 4), ('b', 5)] 
1119          """ 
1120          rdd = other.map(lambda x: (x, True))  
1121          return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])  
 1122   
1124          """ 
1125          Creates tuples of the elements in this RDD by applying C{f}. 
1126   
1127          >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 
1128          >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 
1129          >>> sorted(x.cogroup(y).collect()) 
1130          [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 
1131          """ 
1132          return self.map(lambda x: (f(x), x)) 
 1133   
1135          """ 
1136           Return a new RDD that has exactly numPartitions partitions. 
1137             
1138           Can increase or decrease the level of parallelism in this RDD. Internally, this uses 
1139           a shuffle to redistribute data. 
1140           If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 
1141           which can avoid performing a shuffle. 
1142           >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 
1143           >>> sorted(rdd.glom().collect()) 
1144           [[1], [2, 3], [4, 5], [6, 7]] 
1145           >>> len(rdd.repartition(2).glom().collect()) 
1146           2 
1147           >>> len(rdd.repartition(10).glom().collect()) 
1148           10 
1149          """ 
1150          jrdd = self._jrdd.repartition(numPartitions) 
1151          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1152   
1153 -    def coalesce(self, numPartitions, shuffle=False): 
 1154          """ 
1155          Return a new RDD that is reduced into `numPartitions` partitions. 
1156          >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 
1157          [[1], [2, 3], [4, 5]] 
1158          >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 
1159          [[1, 2, 3, 4, 5]] 
1160          """ 
1161          jrdd = self._jrdd.coalesce(numPartitions) 
1162          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1163   
1164 -    def zip(self, other): 
 1165          """ 
1166          Zips this RDD with another one, returning key-value pairs with the first element in each RDD 
1167          second element in each RDD, etc. Assumes that the two RDDs have the same number of 
1168          partitions and the same number of elements in each partition (e.g. one was made through 
1169          a map on the other). 
1170   
1171          >>> x = sc.parallelize(range(0,5)) 
1172          >>> y = sc.parallelize(range(1000, 1005)) 
1173          >>> x.zip(y).collect() 
1174          [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 
1175          """ 
1176          pairRDD = self._jrdd.zip(other._jrdd) 
1177          deserializer = PairDeserializer(self._jrdd_deserializer, 
1178                                               other._jrdd_deserializer) 
1179          return RDD(pairRDD, self.ctx, deserializer) 
 1180   
1182          """ 
1183          Return the name of this RDD. 
1184          """ 
1185          name_ = self._jrdd.name() 
1186          if not name_: 
1187              return None 
1188          return name_.encode('utf-8') 
 1189   
1191          """ 
1192          Assign a name to this RDD. 
1193          >>> rdd1 = sc.parallelize([1,2]) 
1194          >>> rdd1.setName('RDD1') 
1195          >>> rdd1.name() 
1196          'RDD1' 
1197          """ 
1198          self._jrdd.setName(name) 
 1199   
1201          """ 
1202          A description of this RDD and its recursive dependencies for debugging. 
1203          """ 
1204          debug_string = self._jrdd.toDebugString() 
1205          if not debug_string: 
1206              return None 
1207          return debug_string.encode('utf-8') 
 1208   
1210          """ 
1211          Get the RDD's current storage level. 
1212          >>> rdd1 = sc.parallelize([1,2]) 
1213          >>> rdd1.getStorageLevel() 
1214          StorageLevel(False, False, False, 1) 
1215          """ 
1216          java_storage_level = self._jrdd.getStorageLevel() 
1217          storage_level = StorageLevel(java_storage_level.useDisk(), 
1218                                       java_storage_level.useMemory(), 
1219                                       java_storage_level.deserialized(), 
1220                                       java_storage_level.replication()) 
1221          return storage_level 
 1222   
1230      """ 
1231      Pipelined maps: 
1232      >>> rdd = sc.parallelize([1, 2, 3, 4]) 
1233      >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 
1234      [4, 8, 12, 16] 
1235      >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 
1236      [4, 8, 12, 16] 
1237   
1238      Pipelined reduces: 
1239      >>> from operator import add 
1240      >>> rdd.map(lambda x: 2 * x).reduce(add) 
1241      20 
1242      >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 
1243      20 
1244      """ 
1245 -    def __init__(self, prev, func, preservesPartitioning=False): 
 1246          if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 
1247               
1248              self.func = func 
1249              self.preservesPartitioning = preservesPartitioning 
1250              self._prev_jrdd = prev._jrdd 
1251              self._prev_jrdd_deserializer = prev._jrdd_deserializer 
1252          else: 
1253              prev_func = prev.func 
1254              def pipeline_func(split, iterator): 
1255                  return func(split, prev_func(split, iterator)) 
 1256              self.func = pipeline_func 
1257              self.preservesPartitioning = \ 
1258                  prev.preservesPartitioning and preservesPartitioning 
1259              self._prev_jrdd = prev._prev_jrdd   
1260              self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 
1261          self.is_cached = False 
1262          self.is_checkpointed = False 
1263          self.ctx = prev.ctx 
1264          self.prev = prev 
1265          self._jrdd_val = None 
1266          self._jrdd_deserializer = self.ctx.serializer 
1267          self._bypass_serializer = False 
 1268   
1269      @property 
1271          if self._jrdd_val: 
1272              return self._jrdd_val 
1273          if self._bypass_serializer: 
1274              serializer = NoOpSerializer() 
1275          else: 
1276              serializer = self.ctx.serializer 
1277          command = (self.func, self._prev_jrdd_deserializer, serializer) 
1278          pickled_command = CloudPickleSerializer().dumps(command) 
1279          broadcast_vars = ListConverter().convert( 
1280              [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 
1281              self.ctx._gateway._gateway_client) 
1282          self.ctx._pickled_broadcast_vars.clear() 
1283          class_tag = self._prev_jrdd.classTag() 
1284          env = MapConverter().convert(self.ctx.environment, 
1285                                       self.ctx._gateway._gateway_client) 
1286          includes = ListConverter().convert(self.ctx._python_includes, 
1287                                       self.ctx._gateway._gateway_client) 
1288          python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 
1289              bytearray(pickled_command), env, includes, self.preservesPartitioning, 
1290              self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 
1291              class_tag) 
1292          self._jrdd_val = python_rdd.asJavaRDD() 
1293          return self._jrdd_val 
 1294   
1296          return not (self.is_cached or self.is_checkpointed) 
 1297   
1300      import doctest 
1301      from pyspark.context import SparkContext 
1302      globs = globals().copy() 
1303       
1304       
1305      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
1306      (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 
1307      globs['sc'].stop() 
1308      if failure_count: 
1309          exit(-1) 
 1310   
1311   
1312  if __name__ == "__main__": 
1313      _test() 
1314