Categories
container Data

How to Run Apache Spark from a Container

Apach Spark is a large-scale data analytics engine that can utilize distributed computing resources. It supports common data science languages, e.g. Python and R. Its support for Python is provided through the PySpark package. Some advantages of using PySpark over the traditional vanilla Python (numpy and Pandas) are:

  • Speed. Spark can operate on multiple computers. So you don’t have to write your own parallel computing code. People are claiming 100x speed gain using Spark.
  • Scale. You can develop code on a laptop and deploy it on cluster computers to process data at scale.
  • Robust. It won’t crack if some nodes are taken off during the execution time.

Other features, like SparkSQL, Spark ML, and support for data streaming sources bring additional advantages.

After a quick tryout of the Spark container image from Bitnami, I moved on to another image released by Jupyter stack with good documentation. To run the container and expose the Jupyter notebook and share the current host directory with the container, use this command:

docker run -d -p 80:8888 -p 4040:4040 -p 4041:4041 -v ${PWD}:/home/jovyan jupyter/all-spark-notebook

If you need to install additional packages to the container image provided, you could install them by either going inside the container (“docker exec -it spark /bin/bash” or modifying the original docker-compose.yml file.

Categories
Data

Extract Hidden URLs from PDF files

To search for URLs in a PDF file, one can use the built-in search function in PDF readers and look for strings like “http.” However, when the URL is hidden, a simple string search will not work in the reader. A more reliable way to get all the URLs is to use the “pdftotext” program to convert the PDF file to text format, then use the “grep” command to catch all the URLs. The “-raw” option helps to keep the order of the text in place. The example code below converts the file to pdf, and identify the generated txt file by looking at the latest file file, grep the http, then delete the txt file.

pdftotext -raw ml.pdf && file=ls -tr|tail -1; grep http $file; rm $file

Categories
Data

Query SRA Sequence Runs with Python

Retrieving data from SRA is a common task. NCBI has provided a nice tool collection named E-utilities to query and retrieve data from it. The example Python snippet below shows how to query NCBI SRA database using sample identifiers and get a table of linked NCBI BioProject, BioSample, Run, Download location and Size.

import sys, os                                                                                                                                                                                     
import subprocess                                                                                                                                                                                  
import shlex                                                                                                                                                                                       
import pandas as pd 
.....


def get_SRR_from_biosamples(csv: str, batch_size=10, debug=True):                                                                                                                                  
    """Gete SRA run ID from BioSample ID.                                                                                                                                                          
    """                                                                                                                                                                                            
    epost_cmd = 'epost -db biosample -format acc'                                                                                                                                                  
    elink_cmd = 'elink -target sra'                                                                                                                                                                
    efetch_cmd = 'efetch -db sra -format runinfo -mode xml'                                                                                                                                        
    xtract_cmd = """xtract -pattern Row -def "NA" -element BioProject\n                                                                                                                            
     BioSample Run download_path size_MB"""                                                                                                                                                        
                                                                                                                                                                                                   
    sample_ids = []                                                                                                                                                                                
    results = []                                                                                                                                                                                   
                                                                                                                                                                                                   
    with open(csv, 'r') as fh:                                                                                                                                                                     
        total_samples = fh.readlines()                                                                                                                                                             
        print(f'Total samples: {total_samples}')                                                                                                                                                   
        for idx, l in enumerate(total_samples):                                                                                                                                                    
            l = l.rstrip()                                                                                                                                                                         
            sample_ids.append(l)                                                                                                                                                                   
            batch_num = int(idx/batch_size) + 1                                                                                                                                                    
            run_flag = True                                                                                                                                                                        
            if debug:                                                                                                                                                                              
                if batch_num > 1:                                                                                                                                                                  
                    print('Debug mode. Stop execution after 1 batch.')                                                                                                                             
                    run_flag = None                                                                                                                                                                
                    break                                                                                                                                                                          
            if run_flag:                                                                                                                                                                           
                if  ((idx+1)%batch_size == 0) | (idx == len(total_samples) - 1):                                                                                                                   
                    print(f'Processing batch {batch_num}: {sample_ids}')                                                                                                                           
                    batch_results = []                                                                                                                                                             
                                                                                                                                                                                                   
                    sample_ids = ','.join(sample_ids)                                                                                                                                              
                    epost_cmd += f' -id "{sample_ids}"'                                                                                                                                            
                    epost = subprocess.Popen(shlex.split(epost_cmd),                                                                                                                               
                                             stdout=subprocess.PIPE,                                                                                                                               
                                             encoding='utf8')                                                                                                                                      
                    elink = subprocess.Popen(shlex.split(elink_cmd),                                                                                                                               
                                             stdin=epost.stdout,                                                                                                                                   
                                             stdout=subprocess.PIPE,                                                                                                                               
                                             encoding='utf8')                                                                                                                                      
                    efetch = subprocess.Popen(shlex.split(efetch_cmd),                                                                                                                             
                                              stdin=elink.stdout,                                                                                                                                  
                                              stdout=subprocess.PIPE,                                                                                                                              
                                              encoding='utf8')                                                                                                                                     
                    xtract = subprocess.Popen(shlex.split(xtract_cmd),                                                                                                                             
                                              stdin=efetch.stdout,                                                                                                                                 
                                              stdout=subprocess.PIPE,                                                                                                                              
                                              encoding='utf8')                                                                                                                                     
                                                                                                                                                                                                   
                    while epost.returncode is None:                                                                                                                                                
                        epost.poll()                                                                                                                                                               
                                                                                                                                                                                                   
                    for l in xtract.stdout.readlines():                                                                                                                                            
                        if not l.startswith('PRJ'):  # "502 Bad Gateway" when server is busy                                                                                                       
                            sys.stderr.write(f'Error processing {sample_ids}: {l}')                                                                                                                
                        else:                                                                                                                                                                      
                            if debug:                                                                                                                                                              
                                print(l.rstrip())                                                                                                                                                  
                            batch_results.append(l.split())                                                                                                                                        
                    print(f'\nTotal SRA Runs in batch {batch_num}: {len(batch_results)}.\n')                                                                                                       
                    results.extend(batch_results)                                                                                                                                                  
                    sample_ids = []                                                                                                                                                                
    print(f'Total runs in collection: {len(results)} with {idx+1} samples.')                                                                                                                       
                                                                                                                                                                                                   
    data = pd.DataFrame(results, columns=['BioProject', 'BioSample', 'Run', 'Download', 'size_MB'])                                                                                                
    return data                                                                                      

These E-utilities tools are used and need to be accessible from the environment: epost, elink, efetch, xtract. The subprocess module in Python is used to chain together these steps similar to Linux pipes. The samples are queried in batches to prevent too frequent queries to NCBI, which could lead to blocking of your future queries. After receiving the sample run identifiers, one can use the prefetch tool from E-utilities to download the files. And, of course, prefetch can be wrapped and chained together as well.