Skip to content

Commit

Permalink
fixed problems with BAM input and variant-calling only workflow scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
cwhelan committed Aug 20, 2013
1 parent 747dab8 commit 326c9a5
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ protected static JCommander buildJCommander() {
CommandReadSAMFileIntoHDFS readSamFile = new CommandReadSAMFileIntoHDFS();
jc.addCommand("readSAMFileIntoHDFS", readSamFile);

// CommandPrepSAMRecords commandPrepSAMRecords = new CommandPrepSAMRecords();
// jc.addCommand("prepSAMRecords", commandPrepSAMRecords);
CommandPrepSAMRecords commandPrepSAMRecords = new CommandPrepSAMRecords();
jc.addCommand("prepSAMRecords", commandPrepSAMRecords);

// Alignment commands
CommandBWAPairedEnds bwaPairedEnds = new CommandBWAPairedEnds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import edu.ohsu.sonmezsysbio.cloudbreak.mapper.SingleEndAlignmentSummaryMapper;
//import edu.ohsu.sonmezsysbio.cloudbreak.reducer.SingleEndAlignmentSummaryReducer;
//import edu.ohsu.sonmezsysbio.cloudbreak.reducer.SingleEndAlignmentsToPairsReducer;
import edu.ohsu.sonmezsysbio.cloudbreak.reducer.AlignmentsToPairsReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -53,7 +54,7 @@ private void runHadoopJob(Configuration configuration) throws IOException {
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);

//conf.setReducerClass(SingleEndAlignmentsToPairsReducer.class);
conf.setReducerClass(AlignmentsToPairsReducer.class);

conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setOutputKeyClass(Text.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class CommandReadPairedEndFilesIntoHDFS implements CloudbreakCommand {
@Parameter(names = {"--filterBasedOnCasava18Flags"}, description = "Use the CASAVA 1.8 QC filter to filter out read pairs")
boolean casava18filter = false;

@Parameter(names = {"--filesInHDFS"}, description = "Use this flag if the BAM file has already been copied into HDFS")
@Parameter(names = {"--filesInHDFS"}, description = "Use this flag if the FASTQ files have already been copied into HDFS")
boolean filesInHDFS = false;

private long numRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.log4j.Logger;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.zip.GZIPOutputStream;
Expand Down Expand Up @@ -46,22 +43,14 @@ public class CommandReadSAMFileIntoHDFS implements CloudbreakCommand {
@Parameter(names = {"--compress"}, description = "Compression codec to use for the data")
String compress = "snappy";

@Parameter(names = {"--filesInHDFS"}, description = "Use this flag if the BAM file has already been copied into HDFS")
boolean filesInHDFS = false;

public void run(Configuration conf) throws Exception {
Configuration config = new Configuration();

FileSystem hdfs = FileSystem.get(config);
Path p = new Path(hdfsDataDir + "/" + outFileName);

//HDFSWriter writer = getHdfsWriter(config, hdfs, p);

// try {
// readFile(writer, samFile);
// } finally {
// writer.close();
// }

readFile(null, samFile, config, hdfs);
readFile(samFile, config, hdfs);
}

static HDFSWriter getHdfsWriter(Configuration config, FileSystem hdfs, Path p, String compress) throws IOException {
Expand All @@ -83,17 +72,25 @@ static HDFSWriter getHdfsWriter(Configuration config, FileSystem hdfs, Path p, S
return writer;
}

private void readFile(HDFSWriter writer, String samFile, Configuration config, FileSystem hdfs) throws IOException {
private void readFile(String samFile, Configuration config, FileSystem hdfs) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
SAMFileReader samFileReader = new SAMFileReader(new File(samFile));
InputStream inputStream;
if (filesInHDFS) {
inputStream = hdfs.open(new Path(samFile));
} else {
inputStream = new FileInputStream(samFile);
}

SAMFileReader samFileReader = new SAMFileReader(inputStream);
samFileReader.setValidationStringency(SAMFileReader.ValidationStringency.SILENT);
SAMRecordIterator it = samFileReader.iterator();

String currentReadName = "";
int numRecords = 0;
long lastTime = System.currentTimeMillis();
Text key = new Text();
KeyVal[] buffer = new KeyVal[10000000];
int bufferSize = 500000;
KeyVal[] buffer = new KeyVal[bufferSize];
while (it.hasNext()) {
SAMRecord samRecord = it.next();
String readName = samRecord.getReadName();
Expand All @@ -103,9 +100,9 @@ private void readFile(HDFSWriter writer, String samFile, Configuration config, F
KeyVal keyVal = new KeyVal();
keyVal.key = currentReadName;
keyVal.val = samRecord.getSAMString();
buffer[numRecords] = keyVal;
buffer[numRecords % bufferSize] = keyVal;
numRecords++;
if (numRecords % 10000000 == 0) {
if (numRecords % bufferSize == 0) {
long currentTime = System.currentTimeMillis();
System.err.println("Loaded " + numRecords + " in " + (currentTime - lastTime) + "ms");
lastTime = currentTime;
Expand All @@ -116,10 +113,19 @@ private void readFile(HDFSWriter writer, String samFile, Configuration config, F
uploadThread.path = p;
uploadThread.config = config;
uploadThread.hdfs = hdfs;
buffer = new KeyVal[10000000];
buffer = new KeyVal[bufferSize];
executorService.execute(uploadThread);
}
}
System.err.println("finished reading the file, found " + numRecords);
Path p = new Path(hdfsDataDir + "/" + outFileName + "-" + numRecords);
UploadThread uploadThread = new UploadThread();
uploadThread.recordNum = numRecords;
uploadThread.buff = buffer;
uploadThread.path = p;
uploadThread.config = config;
uploadThread.hdfs = hdfs;
executorService.execute(uploadThread);
executorService.shutdown();
while (!executorService.isTerminated()) {
}
Expand All @@ -146,7 +152,7 @@ public void run() {

try {
writer = getHdfsWriter(config, hdfs, path, CommandReadSAMFileIntoHDFS.this.compress);
for (int i = 0; i < buff.length; i++) {
for (int i = 0; i <= (recordNum - 1) % buff.length; i++) {
writer.write(new Text(buff[i].key), buff[i].val);
}

Expand All @@ -159,7 +165,6 @@ public void run() {
e.printStackTrace();
}
}
System.out.println("Loaded " + recordNum);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ public void reduce(Text key, Iterator<Text> values,
while (values.hasNext()) {
String alignment = values.next().toString();
String[] fields = alignment.split("\t");
if ((Integer.valueOf(fields[1]) & 0x40) > 0) {
Integer flag = Integer.valueOf(fields[1]);
if ((flag & 0x4) > 0) continue;
if ((flag & 0x40) > 0) {
read1Alignments.add(alignment);
} else if ((Integer.valueOf(fields[1]) & 0x80) > 0) {
} else if ((flag & 0x80) > 0) {
read2Alignments.add(alignment);
} else {
throw new RuntimeException("bad line: " + alignment);
Expand Down
14 changes: 12 additions & 2 deletions src/main/scripts/Cloudbreak-EC2-whirr-variants-only.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ echo "File: $CLOUDBREAK_JAR"
hadoop jar $CLOUDBREAK_JAR copyToS3 --S3Bucket $MY_BUCKET_NAME --fileName $CLOUDBREAK_JAR
echo "File: $REFERENCE_FAIDX"
hadoop jar $CLOUDBREAK_JAR copyToS3 --S3Bucket $MY_BUCKET_NAME --fileName $REFERENCE_FAIDX
echo "File: $BAM_FILE"
hadoop jar $CLOUDBREAK_JAR copyToS3 --S3Bucket $MY_BUCKET_NAME --fileName $BAM_FILE


# launch cluster with whirr
echo "=================================="
Expand All @@ -90,9 +93,16 @@ echo "=================================="
echo "Reading alignments into HDFS"
echo "=================================="
time hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar readSAMFileIntoHDFS \
--HDFSDataDir $HDFS_EXPERIMENT_DIR/alignments/ \
--samFile $BAM_FILE
--HDFSDataDir $HDFS_EXPERIMENT_DIR/alignments_import/ \
--samFile /user/cloudbreak/$BAM_FILE \
--filesInHDFS

echo "=================================="
echo "Prepping alignments for Cloudbreak"
echo "=================================="
time hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar -Dmapred.reduce.tasks=25 prepSAMRecords \
--inputHDFSDir $HDFS_EXPERIMENT_DIR/alignments_import/ \
--outputHDFSDir $HDFS_EXPERIMENT_DIR/alignments/

echo "=================================="
echo "Creating a readgroup file"
Expand Down
10 changes: 7 additions & 3 deletions src/main/scripts/Cloudbreak-variants-only.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,16 @@ GENOTYPING_ALPHA_THRESHOLD=.35
# experiment name
NAME=cloudbreak_${LIBRARY_NAME}_${READ_GROUP_NAME}

# read BAM file into HDFS
echo "reading BAM file into HDFS"
time hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar readSAMFileIntoHDFS \
--HDFSDataDir $HDFS_EXPERIMENT_DIR/alignments/ \
time HADOOP_HEAP_SIZE=4000 hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar readSAMFileIntoHDFS \
--HDFSDataDir $HDFS_EXPERIMENT_DIR/alignments_import/ \
--samFile $BAM_FILE

echo "preparing the reads for Cloudbreak"
time hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar -Dmapred.reduce.tasks=25 prepSAMRecords \
--inputHDFSDir $HDFS_EXPERIMENT_DIR/alignments_import/ \
--outputHDFSDir $HDFS_EXPERIMENT_DIR/alignments/

# write a read group info file and copy into HDFS
echo "creating readgroup file"
echo "$READ_GROUP_NAME $LIBRARY_NAME $INSERT_SIZE $INSERT_SIZE_SD false $HDFS_EXPERIMENT_DIR/alignments" >> readGroupInfo.txt
Expand Down

0 comments on commit 326c9a5

Please sign in to comment.