diff --git a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/Cloudbreak.java b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/Cloudbreak.java index 71392aa..8aa771a 100644 --- a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/Cloudbreak.java +++ b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/Cloudbreak.java @@ -112,8 +112,8 @@ protected static JCommander buildJCommander() { CommandReadSAMFileIntoHDFS readSamFile = new CommandReadSAMFileIntoHDFS(); jc.addCommand("readSAMFileIntoHDFS", readSamFile); -// CommandPrepSAMRecords commandPrepSAMRecords = new CommandPrepSAMRecords(); -// jc.addCommand("prepSAMRecords", commandPrepSAMRecords); + CommandPrepSAMRecords commandPrepSAMRecords = new CommandPrepSAMRecords(); + jc.addCommand("prepSAMRecords", commandPrepSAMRecords); // Alignment commands CommandBWAPairedEnds bwaPairedEnds = new CommandBWAPairedEnds(); diff --git a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandPrepSAMRecords.java b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandPrepSAMRecords.java index 51d62ac..2050fcf 100644 --- a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandPrepSAMRecords.java +++ b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandPrepSAMRecords.java @@ -5,6 +5,7 @@ import edu.ohsu.sonmezsysbio.cloudbreak.mapper.SingleEndAlignmentSummaryMapper; //import edu.ohsu.sonmezsysbio.cloudbreak.reducer.SingleEndAlignmentSummaryReducer; //import edu.ohsu.sonmezsysbio.cloudbreak.reducer.SingleEndAlignmentsToPairsReducer; +import edu.ohsu.sonmezsysbio.cloudbreak.reducer.AlignmentsToPairsReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -53,7 +54,7 @@ private void runHadoopJob(Configuration configuration) throws IOException { conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(Text.class); - //conf.setReducerClass(SingleEndAlignmentsToPairsReducer.class); + conf.setReducerClass(AlignmentsToPairsReducer.class); conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setOutputKeyClass(Text.class); diff --git a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandReadPairedEndFilesIntoHDFS.java b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandReadPairedEndFilesIntoHDFS.java index 1e79e09..8acfbe7 100644 --- a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandReadPairedEndFilesIntoHDFS.java +++ b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandReadPairedEndFilesIntoHDFS.java @@ -51,7 +51,7 @@ public class CommandReadPairedEndFilesIntoHDFS implements CloudbreakCommand { @Parameter(names = {"--filterBasedOnCasava18Flags"}, description = "Use the CASAVA 1.8 QC filter to filter out read pairs") boolean casava18filter = false; - @Parameter(names = {"--filesInHDFS"}, description = "Use this flag if the BAM file has already been copied into HDFS") + @Parameter(names = {"--filesInHDFS"}, description = "Use this flag if the FASTQ files have already been copied into HDFS") boolean filesInHDFS = false; private long numRecords; diff --git a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandReadSAMFileIntoHDFS.java b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandReadSAMFileIntoHDFS.java index 73b4259..1db4367 100644 --- a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandReadSAMFileIntoHDFS.java +++ b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/command/CommandReadSAMFileIntoHDFS.java @@ -15,10 +15,7 @@ import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.log4j.Logger; -import java.io.BufferedWriter; -import java.io.File; -import java.io.IOException; -import java.io.OutputStreamWriter; +import java.io.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.zip.GZIPOutputStream; @@ -46,22 +43,14 @@ public class CommandReadSAMFileIntoHDFS implements CloudbreakCommand { @Parameter(names = {"--compress"}, description = "Compression codec to use for the data") String compress = "snappy"; + @Parameter(names = {"--filesInHDFS"}, description = "Use this flag if the BAM file has already been copied into HDFS") + boolean filesInHDFS = false; public void run(Configuration conf) throws Exception { Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(config); - Path p = new Path(hdfsDataDir + "/" + outFileName); - - //HDFSWriter writer = getHdfsWriter(config, hdfs, p); - -// try { -// readFile(writer, samFile); -// } finally { -// writer.close(); -// } - - readFile(null, samFile, config, hdfs); + readFile(samFile, config, hdfs); } static HDFSWriter getHdfsWriter(Configuration config, FileSystem hdfs, Path p, String compress) throws IOException { @@ -83,9 +72,16 @@ static HDFSWriter getHdfsWriter(Configuration config, FileSystem hdfs, Path p, S return writer; } - private void readFile(HDFSWriter writer, String samFile, Configuration config, FileSystem hdfs) throws IOException { + private void readFile(String samFile, Configuration config, FileSystem hdfs) throws IOException { ExecutorService executorService = Executors.newFixedThreadPool(2); - SAMFileReader samFileReader = new SAMFileReader(new File(samFile)); + InputStream inputStream; + if (filesInHDFS) { + inputStream = hdfs.open(new Path(samFile)); + } else { + inputStream = new FileInputStream(samFile); + } + + SAMFileReader samFileReader = new SAMFileReader(inputStream); samFileReader.setValidationStringency(SAMFileReader.ValidationStringency.SILENT); SAMRecordIterator it = samFileReader.iterator(); @@ -93,7 +89,8 @@ private void readFile(HDFSWriter writer, String samFile, Configuration config, F int numRecords = 0; long lastTime = System.currentTimeMillis(); Text key = new Text(); - KeyVal[] buffer = new KeyVal[10000000]; + int bufferSize = 500000; + KeyVal[] buffer = new KeyVal[bufferSize]; while (it.hasNext()) { SAMRecord samRecord = it.next(); String readName = samRecord.getReadName(); @@ -103,9 +100,9 @@ private void readFile(HDFSWriter writer, String samFile, Configuration config, F KeyVal keyVal = new KeyVal(); keyVal.key = currentReadName; keyVal.val = samRecord.getSAMString(); - buffer[numRecords] = keyVal; + buffer[numRecords % bufferSize] = keyVal; numRecords++; - if (numRecords % 10000000 == 0) { + if (numRecords % bufferSize == 0) { long currentTime = System.currentTimeMillis(); System.err.println("Loaded " + numRecords + " in " + (currentTime - lastTime) + "ms"); lastTime = currentTime; @@ -116,10 +113,19 @@ private void readFile(HDFSWriter writer, String samFile, Configuration config, F uploadThread.path = p; uploadThread.config = config; uploadThread.hdfs = hdfs; - buffer = new KeyVal[10000000]; + buffer = new KeyVal[bufferSize]; executorService.execute(uploadThread); } } + System.err.println("finished reading the file, found " + numRecords); + Path p = new Path(hdfsDataDir + "/" + outFileName + "-" + numRecords); + UploadThread uploadThread = new UploadThread(); + uploadThread.recordNum = numRecords; + uploadThread.buff = buffer; + uploadThread.path = p; + uploadThread.config = config; + uploadThread.hdfs = hdfs; + executorService.execute(uploadThread); executorService.shutdown(); while (!executorService.isTerminated()) { } @@ -146,7 +152,7 @@ public void run() { try { writer = getHdfsWriter(config, hdfs, path, CommandReadSAMFileIntoHDFS.this.compress); - for (int i = 0; i < buff.length; i++) { + for (int i = 0; i <= (recordNum - 1) % buff.length; i++) { writer.write(new Text(buff[i].key), buff[i].val); } @@ -159,7 +165,6 @@ public void run() { e.printStackTrace(); } } - System.out.println("Loaded " + recordNum); } } diff --git a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/reducer/AlignmentsToPairsReducer.java b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/reducer/AlignmentsToPairsReducer.java index fc1e0b7..af626c3 100644 --- a/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/reducer/AlignmentsToPairsReducer.java +++ b/src/main/java/edu/ohsu/sonmezsysbio/cloudbreak/reducer/AlignmentsToPairsReducer.java @@ -29,9 +29,11 @@ public void reduce(Text key, Iterator values, while (values.hasNext()) { String alignment = values.next().toString(); String[] fields = alignment.split("\t"); - if ((Integer.valueOf(fields[1]) & 0x40) > 0) { + Integer flag = Integer.valueOf(fields[1]); + if ((flag & 0x4) > 0) continue; + if ((flag & 0x40) > 0) { read1Alignments.add(alignment); - } else if ((Integer.valueOf(fields[1]) & 0x80) > 0) { + } else if ((flag & 0x80) > 0) { read2Alignments.add(alignment); } else { throw new RuntimeException("bad line: " + alignment); diff --git a/src/main/scripts/Cloudbreak-EC2-whirr-variants-only.sh b/src/main/scripts/Cloudbreak-EC2-whirr-variants-only.sh index 4d62b4a..9ac4f67 100755 --- a/src/main/scripts/Cloudbreak-EC2-whirr-variants-only.sh +++ b/src/main/scripts/Cloudbreak-EC2-whirr-variants-only.sh @@ -64,6 +64,9 @@ echo "File: $CLOUDBREAK_JAR" hadoop jar $CLOUDBREAK_JAR copyToS3 --S3Bucket $MY_BUCKET_NAME --fileName $CLOUDBREAK_JAR echo "File: $REFERENCE_FAIDX" hadoop jar $CLOUDBREAK_JAR copyToS3 --S3Bucket $MY_BUCKET_NAME --fileName $REFERENCE_FAIDX +echo "File: $BAM_FILE" +hadoop jar $CLOUDBREAK_JAR copyToS3 --S3Bucket $MY_BUCKET_NAME --fileName $BAM_FILE + # launch cluster with whirr echo "==================================" @@ -90,9 +93,16 @@ echo "==================================" echo "Reading alignments into HDFS" echo "==================================" time hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar readSAMFileIntoHDFS \ - --HDFSDataDir $HDFS_EXPERIMENT_DIR/alignments/ \ - --samFile $BAM_FILE + --HDFSDataDir $HDFS_EXPERIMENT_DIR/alignments_import/ \ + --samFile /user/cloudbreak/$BAM_FILE \ + --filesInHDFS +echo "==================================" +echo "Prepping alignments for Cloudbreak" +echo "==================================" +time hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar -Dmapred.reduce.tasks=25 prepSAMRecords \ + --inputHDFSDir $HDFS_EXPERIMENT_DIR/alignments_import/ \ + --outputHDFSDir $HDFS_EXPERIMENT_DIR/alignments/ echo "==================================" echo "Creating a readgroup file" diff --git a/src/main/scripts/Cloudbreak-variants-only.sh b/src/main/scripts/Cloudbreak-variants-only.sh index e98d5af..ae11605 100755 --- a/src/main/scripts/Cloudbreak-variants-only.sh +++ b/src/main/scripts/Cloudbreak-variants-only.sh @@ -57,12 +57,16 @@ GENOTYPING_ALPHA_THRESHOLD=.35 # experiment name NAME=cloudbreak_${LIBRARY_NAME}_${READ_GROUP_NAME} -# read BAM file into HDFS echo "reading BAM file into HDFS" -time hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar readSAMFileIntoHDFS \ - --HDFSDataDir $HDFS_EXPERIMENT_DIR/alignments/ \ +time HADOOP_HEAP_SIZE=4000 hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar readSAMFileIntoHDFS \ + --HDFSDataDir $HDFS_EXPERIMENT_DIR/alignments_import/ \ --samFile $BAM_FILE +echo "preparing the reads for Cloudbreak" +time hadoop jar $CLOUDBREAK_HOME/cloudbreak-${project.version}.jar -Dmapred.reduce.tasks=25 prepSAMRecords \ + --inputHDFSDir $HDFS_EXPERIMENT_DIR/alignments_import/ \ + --outputHDFSDir $HDFS_EXPERIMENT_DIR/alignments/ + # write a read group info file and copy into HDFS echo "creating readgroup file" echo "$READ_GROUP_NAME $LIBRARY_NAME $INSERT_SIZE $INSERT_SIZE_SD false $HDFS_EXPERIMENT_DIR/alignments" >> readGroupInfo.txt