package com.test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
/**
* 文件断点续传加分段上传线程
* @author wzztestin
*
*/
/**
* 文件断点续传加分段上传线程
* @author wzztestin
*
*/
public class DownFileFetch extends Thread {
DownFileInfoBean siteInfoBean = null; // 文件信息 Bean
long[] nStartPos; // 开始位置
long[] nEndPos; // 结束位置
DownFileSplitterFetch[] fileSplitterFetch; // 子线程对象
long nFileLength; // 文件长度
boolean bFirst = true; // 是否第一次取文件
boolean bStop = false; // 停止标志
File tmpFile; // 文件下载的临时信息
DataOutputStream output; // 输出到文件的输出流
boolean fileflag; //是本地上传还是远程下载的标志
File downfile; //本地文件下载
int splitter = 0;
/**
* 下载上传文件抓取初始化
* @param bean
* @throws IOException
*/
public DownFileFetch(DownFileInfoBean bean) throws IOException {
siteInfoBean = bean;
/**
* File.separator windows是\,unix是/
*/
tmpFile = new File(bean.getSFilePath() + File.separator
+ bean.getSFileName() + ".info");
if (tmpFile.exists()) {
bFirst = false;
//读取已下载的文件信息
read_nPos();
} else {
nStartPos = new long[bean.getNSplitter()];
nEndPos = new long[bean.getNSplitter()];
}
fileflag = bean.getFileflag();
downfile = bean.getDownfile();
this.splitter = bean.getNSplitter();
}
public void run() {
// 获得文件长度
// 分割文件
// 实例 FileSplitterFetch
// 启动 FileSplitterFetch 线程
// 等待子线程返回
try {
if (bFirst) {
nFileLength = getFileSize();
if (nFileLength == -1) {
DownFileUtility.log("File Length is not known!");
} else if (nFileLength == -2) {
DownFileUtility.log("File is not access!");
} else {
for (int i = 0; i < nStartPos.length; i++) {
nStartPos[i] = (long) (i * (nFileLength / nStartPos.length));
}
for (int i = 0; i < nEndPos.length - 1; i++) {
nEndPos[i] = nStartPos[i + 1];
}
nEndPos[nEndPos.length - 1] = nFileLength;
}
}
// 启动子线程
fileSplitterFetch = new DownFileSplitterFetch[nStartPos.length];
for (int i = 0; i < nStartPos.length; i++) {
fileSplitterFetch[i] = new DownFileSplitterFetch(
siteInfoBean.getSSiteURL(), siteInfoBean.getSFilePath()
+ File.separator + siteInfoBean.getSFileName()+"_"+i,
nStartPos[i], nEndPos[i], i,fileflag,downfile,bFirst);
DownFileUtility.log("Thread " + i + " , nStartPos = " + nStartPos[i]
+ ", nEndPos = " + nEndPos[i]);
fileSplitterFetch[i].start();
}
//下载子线程是否完成标志
boolean breakWhile = false;
while (!bStop) {
write_nPos();
DownFileUtility.sleep(500);
breakWhile = true;
for (int i = 0; i < nStartPos.length; i++) {
if (!fileSplitterFetch[i].bDownOver) {
breakWhile = false;
break;
}else{
write_nPos();
}
}
if (breakWhile){
break;
}
}
hebinfile(siteInfoBean.getSFilePath()+ File.separator + siteInfoBean.getSFileName(),splitter);
DownFileUtility.log("文件下载结束!");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获得文件长度
* @return
*/
public long getFileSize() {
int nFileLength = -1;
if(fileflag){
try {
URL url = new URL(siteInfoBean.getSSiteURL());
HttpURLConnection httpConnection = (HttpURLConnection) url
.openConnection();
httpConnection.setRequestProperty("User-Agent", "NetFox");
int responseCode = httpConnection.getResponseCode();
if (responseCode >= 400) {
processErrorCode(responseCode);
//represent access is error
return -2;
}
String sHeader;
for (int i = 1;; i++) {
sHeader = httpConnection.getHeaderFieldKey(i);
if (sHeader != null) {
if (sHeader.equals("Content-Length")) {
nFileLength = Integer.parseInt(httpConnection
.getHeaderField(sHeader));
break;
}
} else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
DownFileUtility.log(nFileLength);
}else{
try{
File myflie = downfile;
nFileLength = (int)myflie.length();
}catch(Exception e){
e.printStackTrace();
}
DownFileUtility.log(nFileLength);
}
return nFileLength;
}
/**
* 保存下载信息(文件指针位置)
*/
private void write_nPos() {
try {
output = new DataOutputStream(new FileOutputStream(tmpFile));
output.writeInt(nStartPos.length);
for (int i = 0; i < nStartPos.length; i++) {
output.writeLong(fileSplitterFetch[i].nStartPos);
output.writeLong(fileSplitterFetch[i].nEndPos);
}
output.close();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 读取保存的下载信息(文件指针位置)
*/
private void read_nPos() {
try {
DataInputStream input = new DataInputStream(new FileInputStream(
tmpFile));
int nCount = input.readInt();
nStartPos = new long[nCount];
nEndPos = new long[nCount];
for (int i = 0; i < nStartPos.length; i++) {
nStartPos[i] = input.readLong();
nEndPos[i] = input.readLong();
}
input.close();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 输出错误信息
* @param nErrorCode
*/
private void processErrorCode(int nErrorCode) {
DownFileUtility.log("Error Code : " + nErrorCode);
}
/**
* 停止文件下载
*/
public void siteStop() {
bStop = true;
for (int i = 0; i < nStartPos.length; i++)
fileSplitterFetch[i].splitterStop();
}
/**
* 合并文件
* @param sName
* @param splitternum
*/
private void hebinfile(String sName,int splitternum){
try{
File file = new File(sName);
if(file.exists()){
file.delete();
}
RandomAccessFile saveinput = new RandomAccessFile(sName,"rw");
for(int i = 0;i<splitternum;i++){
try {
RandomAccessFile input = new RandomAccessFile (new File(sName+"_"+i),"r");
byte[] b = new byte[1024];
int nRead;
while ((nRead = input.read(b, 0, 1024)) > 0) {
write(saveinput,b, 0, nRead);
}
input.close();
} catch (Exception e) {
e.printStackTrace();
}
}
DownFileUtility.log("file size is "+saveinput.length());
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 写文件
* @param b
* @param nStart
* @param nLen
* @return
*/
private int write(RandomAccessFile oSavedFile,byte[] b, int nStart, int nLen) {
int n = -1;
try {
oSavedFile.seek(oSavedFile.length());
oSavedFile.write(b, nStart, nLen);
n = nLen;
} catch (IOException e) {
e.printStackTrace();
}
return n;
}
}
|
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
public class BlockingQueueTest {
public static void main(String[] args) {
Scanner in = new Scanner(System.in);
System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();
final int FILE_QUEUE_SIZE = 10;
final int SEARCH_THREADS = 100;
BlockingQueue queue = new ArrayBlockingQueue(FILE_QUEUE_SIZE);
FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
new Thread(enumerator).start();
for (int i = 1; i <= SEARCH_THREADS; i++) new Thread(new SearchTask(queue, keyword)).start(); } }
/**
* This task enumerates all files in a directory and its subdirectories.
*/
class FileEnumerationTask implements Runnable {
/**
* Constructs a FileEnumerationTask.
* @param queue the blocking queue to which the enumerated files are added
* @param startingDirectory the directory in which to start the enumeration
*/
public FileEnumerationTask(BlockingQueue queue, File startingDirectory) {
this.queue = queue; this.startingDirectory = startingDirectory;
}
public void run() {
try {
enumerate(startingDirectory);
queue.put(DUMMY);
} catch (
InterruptedException e) { }
}
/**
* Recursively enumerates all files in a given directory and its subdirectories
* @param directory the directory in which to start
*/
public void enumerate(File directory) throws InterruptedException {
File[] files = directory.listFiles();
for (File file : files) {
if (file.isDirectory())
enumerate(file);
else queue.put(file);
}
}
public static File DUMMY = new File("");
private BlockingQueue queue; private File startingDirectory;
}
/**
* This task searches files for a given keyword.
*/
class SearchTask implements Runnable {
/**
* Constructs a SearchTask.
* @param queue the queue from which to take files
* @param keyword the keyword to look for */
public SearchTask(BlockingQueue queue, String keyword) {
this.queue = queue; this.keyword = keyword; }
public void run() {
try {
boolean done = false;
while (!done) {
File file = queue.take();
if (file == FileEnumerationTask.DUMMY) {
queue.put(file); done = true;
} else
search(file);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) { }
}
/**
* Searches a file for a given keyword and prints all matching lines.
* @param file the file to search
*/
public void search(File file) throws IOException {
Scanner in = new Scanner(new FileInputStream(file));
int lineNumber = 0;
while (in.hasNextLine()) {
lineNumber++; String line = in.nextLine().trim();
if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber,line);
}
in.close();
}
private BlockingQueue queue;
private String keyword;
}
|