diff --git a/src/clientP2P/ClientDownloadPartUDP.java b/src/clientP2P/ClientDownloadPartUDP.java new file mode 100644 index 0000000..4ec0391 --- /dev/null +++ b/src/clientP2P/ClientDownloadPartUDP.java @@ -0,0 +1,297 @@ +package clientP2P; +import java.util.List; +import java.util.ArrayList; +import java.net.DatagramSocket; +import protocolP2P.ProtocolP2PPacketUDP; +import protocolP2P.Payload; +import protocolP2P.LoadRequest; +import protocolP2P.FilePart; +import exception.InternalError; +import remoteException.EmptyDirectory; +import remoteException.EmptyFile; +import exception.ProtocolError; +import remoteException.InternalRemoteError; +import remoteException.VersionRemoteError; +import exception.TransmissionError; +import remoteException.ProtocolRemoteError; +import exception.VersionError; +import exception.SizeError; +import remoteException.NotFound; +import java.nio.file.Files; +import java.io.File; +import java.nio.file.Paths; +import java.io.IOException; + +/** Class to download file parts on udp. +* @author Louis Royer +* @author Flavien Haas +* @author JS Auge +* @version 1.0 +*/ +public class ClientDownloadPartUDP implements Runnable { + + private List toDoTasks; + private List pendingTasks; + private List tasksDone; + private volatile boolean tasksListsLock; + private volatile boolean stop; + private volatile boolean failed; + private String filename; + private DatagramSocket socket; + private volatile boolean noTask; + private String partsSubdir; + private static final long MAX_PARTIAL_SIZE = 4096; + + /** Constructor with filename, socket, and part subdir + * @param filename name of file to download + * @param socket socket to use + * @param partsSubdir directory to store .part files + */ + public ClientDownloadPartUDP(String filename, DatagramSocket socket, String partsSubdir) { + this.partsSubdir = partsSubdir; + this.filename = filename; + this.socket = socket; + stop = false; + failed = false; + pendingTasks = new ArrayList<>(); + toDoTasks = new ArrayList<>(); + tasksDone = new ArrayList<>(); + noTask = true; + tasksListsLock = false; + } + + /** True if thread has failed to get a file. + * @return true if thread has failed to get a file + */ + public boolean hasFailed() { + return failed; + } + + /** Asks to stop thread. + * @throws InterruptedException + */ + public synchronized void setStop() throws InterruptedException { + stop = true; + this.notifyAll(); + } + + /** Runnable implementation */ + public void run() { + while(!stop) { + try { + doTasks(); + } catch(InterruptedException e) { + try { + setStop(); + } catch (InterruptedException e2) { + } + } + } + System.err.println("Closing socket"); + socket.close(); + } + + /** Get list of offsets that have not be downloaded if failed, else + * empty list. + * @return list of offsets + */ + public List getFailed() { + List ret = new ArrayList<>(); + if (failed) { + ret.addAll(pendingTasks); + ret.addAll(toDoTasks); + } + return ret; + } + + /** Get list of downloaded file parts offset, then clear this list. + * @return list of offsets + * @throws InterruptedException + */ + public List getDone() throws InterruptedException { + if (tasksDone.size() == 0) { + return new ArrayList<>(); + } else { + synchronized (this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + List ret = new ArrayList<>(tasksDone); + tasksDone.clear(); + tasksListsLock = false; + this.notifyAll(); + return ret; + } + } + } + + /** Adds offset of files parts to download. + * @param task offset to download + * @throws InterruptedException + */ + public synchronized void assignTask(Long task) throws InterruptedException { + synchronized(this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + toDoTasks.add(task); + noTask = false; + tasksListsLock = false; + this.notifyAll(); + } + } + + /** Send one request and wait for one response. Blocks when no task. + * @throws InterruptedException + */ + public synchronized void doTasks() throws InterruptedException { + while(noTask && !stop) { + this.wait(); + } + if (!stop) { + try { + Long offset = toDoTasks.get(0); + ProtocolP2PPacketUDP p = reqPart(offset); + if (p == null) { + stop = true; + } + + failed = downloadPart(p); + if (failed) { + System.err.println("Error: DownloadPart failed."); + stop = true; + } else if (toDoTasks.isEmpty()) { + noTask = true; + } + } catch (IndexOutOfBoundsException e) { + noTask = true; + } + } + } + + public ProtocolP2PPacketUDP reqPart(Long offset) { + System.err.println("New request: "+ offset); + // maintain tracking of tasks + if (toDoTasks.contains(offset)) { + try { + synchronized (this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + toDoTasks.remove(offset); + pendingTasks.add(offset); + tasksListsLock = false; + this.notifyAll(); + } + } catch(InterruptedException e) { + System.err.println("Error: reqPart interruptedException"); + return null; + } + } else { + System.err.println("Error: reqPart (offset " + offset + " not in toDoTasks)"); + return null; + } + // send request + try { + ProtocolP2PPacketUDP d = new ProtocolP2PPacketUDP((Payload) new LoadRequest(filename, offset.longValue(), MAX_PARTIAL_SIZE)); + d.sendRequest((Object)socket); + return d; + } catch (InternalError e) { + System.err.println("Error: reqPart internalError"); + return null; + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Error: reqPart ioexception"); + return null; + } + } + + public boolean downloadPart(ProtocolP2PPacketUDP d) { + if (d == null) { + System.err.println("Error: downloadPart -> d is null."); + return true; + } + try { + Payload p = d.receiveResponse().getPayload(); + assert p instanceof FilePart : "This payload must be instance of FilePart"; + if (!(p instanceof FilePart)) { + System.err.println("Error: cannot get size."); + return true; + } else { + FilePart fp = (FilePart)p; + if (!fp.getFilename().equals(filename)) { + System.err.println("Error: wrong file received: `" + fp.getFilename() + "`"); + return true; + } + Long offset = Long.valueOf(fp.getOffset()); + if (pendingTasks.contains(offset)) { + try { + Files.write(new File(partsSubdir + filename + "_" + offset + ".part").toPath(), fp.getPartialContent()); + } catch (IOException e) { + System.err.println("Error: cannot write file (" + partsSubdir + filename + "_" + offset + ".part)"); + } + } else { + System.err.println("Error: wrong file part received."); + return true; + } + try { + synchronized(this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + pendingTasks.remove(offset); + tasksDone.add(offset); + tasksListsLock = false; + this.notifyAll(); + } + } catch(InterruptedException e) { + System.err.println("Error: DownloadPart Interrupted exception"); + return true; + } + } + } catch (EmptyDirectory e) { + System.err.println("Error: empty directory."); + return true; + } catch (EmptyFile e) { + System.err.println("Error: downloadPart emptyFile"); + // TODO: use more specific errors + return true; + } catch (ProtocolError e) { + System.err.println("Error: downloadPart protocolError"); + return true; + } catch (InternalRemoteError e) { + System.err.println("Error: downloadPart internalRemoteError"); + return true; + } catch (VersionRemoteError e) { + System.err.println("Error: downloadPart versionRemoteError"); + return true; + } catch (ProtocolRemoteError e) { + System.err.println("Error: downloadPart protocolRemoteError"); + return true; + } catch (TransmissionError e) { + System.err.println("Error: downloadPart transmissionError"); + return true; + } catch (VersionError e) { + System.err.println("Error: downloadPart versionError"); + return true; + } catch (SizeError e) { + System.err.println("Error: downloadPart sizeError"); + return true; + } catch (NotFound e) { + System.err.println("Error: downloadPart notFound"); + return true; + } catch (IOException e) { + System.err.println("Error: downloadPart ioexception"); + return true; + } catch (InternalError e) { + System.err.println("Error: downloadPart internalError"); + return true; + } + return false; + } + +} diff --git a/src/clientP2P/ClientDownloadUDP.java b/src/clientP2P/ClientDownloadUDP.java new file mode 100644 index 0000000..134ab2f --- /dev/null +++ b/src/clientP2P/ClientDownloadUDP.java @@ -0,0 +1,372 @@ +package clientP2P; +import clientP2P.ClientDownloadPartUDP; +import tools.HostItem; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; +import remoteException.EmptyDirectory; +import remoteException.EmptyFile; +import remoteException.VersionRemoteError; +import remoteException.ProtocolRemoteError; +import remoteException.NotFound; +import remoteException.InternalRemoteError; +import protocolP2P.HashAlgorithm; +import protocolP2P.HashResponse; +import protocolP2P.HashRequest; +import protocolP2P.ProtocolP2PPacketUDP; +import protocolP2P.Payload; +import exception.ProtocolError; +import exception.InternalError; +import exception.TransmissionError; +import exception.SizeError; +import exception.VersionError; +import protocolP2P.FilePart; +import protocolP2P.LoadRequest; +import java.io.IOException; +import java.nio.file.Files; +import java.io.File; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.nio.file.StandardCopyOption; + +/** Class to download file from udp +* @author Louis Royer +* @author Flavien Haas +* @author JS Auge +* @version 1.0 +*/ +public class ClientDownloadUDP implements Runnable { + + private List hostList; + private String filename; + private byte[] hash512; + private List sockList = new ArrayList(); + private List offsetsToAsk = new ArrayList(); + private List offsetsPending = new ArrayList(); + private boolean stop; + private long size; + private static final long MAX_PARTIAL_SIZE = 4096; + private String partsSubdir; + private String dirStorage; + private boolean success = false; + + /** Constructor with parameters: filename, list of hosts, parts subdirectory and dirStorage + * @param filename name of file to download + * @param hostList list of servers + * @param partsSubdir directory to store .part files + * @param dirStorage directory to write assembled file + */ + public ClientDownloadUDP(String filename, List hostList, String partsSubdir, String dirStorage) { + this.partsSubdir = partsSubdir; + this.dirStorage = dirStorage; + this.filename = filename; + this.hostList = hostList; + this.stop = false; + } + + /** Asks thread to stop + */ + public void setStop() { + stop = true; + } + + /** Runnable implementation + */ + public void run() { + try { + init(); + if (stop) { + System.err.println("File is smaller than part max size."); + hostList.get(0).closeUDPSocket(); + } else { + System.err.println("File is bigger than part max size."); + purgeList(); + initThreads(); + while(!stop) { + checkTasksStatus(); + assignTasks(); + } + } + System.err.println("Reassembling file parts."); + reassembleFile(); + } catch(InternalError e) { + System.err.println("Error while downloading file. Aborting."); + } finally { + stopTasks(); + } + } + + /** Starts threads for each server in hostList. + */ + private void initThreads() { + for(HostItem hostItem: hostList) { + sockList.add(new ClientDownloadPartUDP(filename, hostItem.getUDPSocket(), partsSubdir)); + } + for(ClientDownloadPartUDP c: sockList) { + Thread t = new Thread(c); + t.start(); + } + System.err.println("Threads initialized"); + } + + /** Remove tasks from failed threads. Update done status. + * @throws InternalError + */ + private void checkTasksStatus() throws InternalError { + List sockListCpy = new ArrayList<>(sockList); + for(ClientDownloadPartUDP c: sockListCpy) { + if (c.hasFailed() == true) { + sockList.remove(c); + offsetsPending.removeAll(c.getFailed()); + offsetsToAsk.addAll(c.getFailed()); + } + try { + offsetsPending.removeAll(c.getDone()); + } catch (InterruptedException e) { + throw new InternalError(); + } + } + System.err.println("Task check status: " + offsetsToAsk.size() + " to asks, " + offsetsPending.size() + " pending"); + if (offsetsToAsk.isEmpty() && offsetsPending.isEmpty()) { + stop = true; + } + if (sockList.size() == 0) { + System.err.println("No thread working"); + throw new InternalError(); + } + } + + /** Assign tasks randomly to threads. + * @throws InternalError + */ + private void assignTasks() throws InternalError { + Random rand = new Random(); + for(long offset : offsetsToAsk) { + try { + sockList.get(rand.nextInt(sockList.size())).assignTask(offset); + offsetsPending.add(offset); + System.err.println("Assigned task "+ offset); + } catch(InterruptedException e) { + throw new InternalError(); + } + } + offsetsToAsk.removeAll(offsetsPending); + } + + /** Stop threads */ + private void stopTasks() { + for(ClientDownloadPartUDP c : sockList) { + try { + c.setStop(); + } catch (InterruptedException e) {} + } + } + + /** Get hashsum from server. + * @param hostItem server to ask hash + * @return hash512sum + * @throws InternalError + */ + private byte[] getHashSum512(HostItem hostItem) throws InternalError { + byte[] hash; + HashAlgorithm[] hashesAlgo = new HashAlgorithm[1]; + hashesAlgo[0] = HashAlgorithm.SHA512; + ProtocolP2PPacketUDP d = new ProtocolP2PPacketUDP((Payload) new HashRequest(filename, hashesAlgo)); + try { + d.sendRequest((Object)hostItem.getUDPSocket()); + try { + Payload pHash = d.receiveResponse().getPayload(); + assert pHash instanceof HashResponse : "This payload must be instance of HashResponse"; + if (!(pHash instanceof HashResponse)) { + throw new InternalError(); + } else { + hash = ((HashResponse)pHash).getHash(HashAlgorithm.SHA512); + } + } catch (EmptyDirectory e) { + hash = new byte[0]; + } catch (NotFound e) { + hash = new byte[0]; + // TODO: use more specific errors + } catch (EmptyFile e) { + throw new InternalError(); + } catch (ProtocolError e) { + throw new InternalError(); + } catch (InternalRemoteError e) { + throw new InternalError(); + } catch (VersionRemoteError e) { + throw new InternalError(); + } catch (ProtocolRemoteError e) { + throw new InternalError(); + } catch (TransmissionError e) { + throw new InternalError(); + } catch (VersionError e) { + throw new InternalError(); + } catch (SizeError e) { + throw new InternalError(); + } + return hash; + } catch (IOException e) { + throw new InternalError(); + } + } + + /** Removes servers not owning the correct file to download from list. + * This is done by comparing hash512sum. + * @throws InternalError + */ + private void purgeList() throws InternalError { + List blackList = new ArrayList(); + boolean first = false; + byte[] hashsum; + for(HostItem host: hostList) { + // already have hashsum from 1st server + if (!first) { + first = true; + continue; + } + // ask hashsum + hashsum = getHashSum512(host); + if (!Arrays.equals(hash512, hashsum)) { + blackList.add(host); + } + } + // purge list + for(HostItem host: blackList) { + hostList.remove(host); + } + System.err.println("Host list purge: done"); + } + + /** Getter for hash512sum + * @return hash512sum + */ + public byte[] getHashSum512() { + return hash512; + } + + /** Initialize infos about file to download (size, hash512sum, partslist to dl). + * Also download first partfile (to get size). + * @throws InternalError + */ + private void init() throws InternalError { + // get size + setSize(); + + // get hashsum from 1st server in list + hash512 = getHashSum512(hostList.get(0)); + if (hash512.length == 0) { + System.err.println("Error: no hash512sum support."); + throw new InternalError(); + } + + // Add tasks + if (!stop) { + for(long i=MAX_PARTIAL_SIZE; i= size) { + success = true; + System.err.println("Reassembling: success"); + } else { + // append to file + try { + Files.write(new File(dirStorage + filename).toPath(), Files.readAllBytes(new File(partsSubdir + filename + "_" + nextOffset + ".part").toPath()), StandardOpenOption.APPEND); + nextOffset = (new File(dirStorage + filename)).length(); + } catch (IOException e) { + abort = true; + System.err.println("Aborting: bad number " + nextOffset); + } + } + } while((!success) && (!abort)); + } +} diff --git a/src/clientP2P/ClientManagementUDP.java b/src/clientP2P/ClientManagementUDP.java index 095e8ed..2e7c045 100644 --- a/src/clientP2P/ClientManagementUDP.java +++ b/src/clientP2P/ClientManagementUDP.java @@ -34,6 +34,7 @@ import protocolP2P.HashRequest; import protocolP2P.HashResponse; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import clientP2P.ClientDownloadUDP; /** Implementation of P2P-JAVA-PROJECT CLIENT * @author Louis Royer @@ -43,6 +44,7 @@ import java.security.NoSuchAlgorithmException; */ public class ClientManagementUDP implements Runnable { private String baseDirectory; + private String partsSubdir; private List hostList; /** Constructor for UDP implementation, with baseDirectory and UDPPort parameters. @@ -50,9 +52,10 @@ public class ClientManagementUDP implements Runnable { * @param host hostname of the server * @param UDPPort the server will listen on this port */ - public ClientManagementUDP(String baseDirectory, List hostList) { + public ClientManagementUDP(String baseDirectory, List hostList, String partsSubdir) { this.baseDirectory = baseDirectory; this.hostList = hostList; + this.partsSubdir = partsSubdir; } /** Implementation of Runnable @@ -115,106 +118,32 @@ public class ClientManagementUDP implements Runnable { * @throws EmptyFile */ private void download(String filename) throws EmptyFile, NotFound, InternalError, UnknownHostException, IOException, TransmissionError, ProtocolError, VersionError, SizeError, InternalRemoteError, ProtocolRemoteError, VersionRemoteError { - byte[] hash512; - final long MAX_PARTIAL_SIZE = 4096; - HashAlgorithm[] hashesAlgo = new HashAlgorithm[1]; - hashesAlgo[0] = HashAlgorithm.SHA512; - ProtocolP2PPacketUDP d = new ProtocolP2PPacketUDP((Payload) new HashRequest(filename, hashesAlgo)); - d.sendRequest((Object)hostList.get(0).getUDPSocket()); + ClientDownloadUDP downLoader = new ClientDownloadUDP(filename, hostList, partsSubdir, baseDirectory); + Thread t = new Thread(downLoader); + t.start(); try { - Payload pHash = d.receiveResponse().getPayload(); - assert pHash instanceof HashResponse : "This payload must be instance of HashResponse"; - if (!(pHash instanceof HashResponse)) { - throw new InternalError(); - } else { - hash512 = ((HashResponse)pHash).getHash(HashAlgorithm.SHA512); - if (hash512.length == 0) { - System.err.println("Error: server do not support sha512 hashes"); - throw new InternalError(); - } - } - } catch (EmptyDirectory e) { - System.err.println("Error: empty directory, but request was not LIST_REQUEST"); - throw new ProtocolError(); - } - - d = new ProtocolP2PPacketUDP((Payload) new LoadRequest(filename, 0, MAX_PARTIAL_SIZE)); - d.sendRequest((Object)hostList.get(0).getUDPSocket()); - boolean fileFullyWritten = false; - long offset = 0; - do { - try { - Payload p = d.receiveResponse().getPayload(); - assert p instanceof FilePart : "This payload must be instance of FilePart"; - if (!(p instanceof FilePart)) { - throw new InternalError(); - } else { - FilePart fp = (FilePart)p; - if (!fp.getFilename().equals(filename)) { - System.err.println("Error: wrong file received: `" + fp.getFilename() + "`"); - throw new ProtocolError(); + t.join(); + if (downLoader.getSuccess()) { + byte[] hash512 = downLoader.getHashSum512(); + if (!Arrays.equals(hash512, computeHashsum(filename, HashAlgorithm.SHA512))) { + System.err.println("Error: Hashsum does not match"); + System.err.println("Computed checksum:"); + byte[] c = computeHashsum(filename, HashAlgorithm.SHA512); + for (byte b: c) { + System.err.print(String.format("%02X", b)); } - if (fp.getOffset() == 0) { - System.err.println("Receiving first partialContent"); - // first partialContent - // increment offset - offset = fp.getPartialContent().length; - /* write first partialContent */ - try { - Files.write(new File(baseDirectory + filename).toPath(), fp.getPartialContent()); - } catch (IOException e) { - System.err.println("Error: cannot write file (" + baseDirectory + filename + ")"); - } - // next partialContentRequest - if (offset != fp.getTotalSize()) { - System.err.println("Sending following request with offset: " + offset + " maxpartialsize: " + MAX_PARTIAL_SIZE); - d = new ProtocolP2PPacketUDP((Payload) new LoadRequest(filename, offset, MAX_PARTIAL_SIZE)); - d.sendRequest((Object)hostList.get(0).getUDPSocket()); - } else { - fileFullyWritten = true; - } - } else if (offset == fp.getOffset()){ - System.err.println("Receiving following partialContent (offset: " + offset + ")"); - // following - // increment offset - offset += fp.getPartialContent().length; - /* write following partialContent at end of file*/ - try { - Files.write(Paths.get(baseDirectory + filename), fp.getPartialContent(), StandardOpenOption.APPEND); - } catch (IOException e) { - System.err.println("Error: cannot write file (" + baseDirectory + filename + ")"); - } - if (offset >= fp.getTotalSize()) { - fileFullyWritten = true; - } else { - // next partialContentRequest - d = new ProtocolP2PPacketUDP((Payload) new LoadRequest(filename, offset, MAX_PARTIAL_SIZE)); - d.sendRequest((Object)hostList.get(0).getUDPSocket()); - } - - } else { - System.err.println("offset: " + fp.getOffset() + " ; content.length: " + fp.getPartialContent().length + " ; totalSize: " + fp.getTotalSize()); - System.err.println("Error: cannot handle non-consecutive partial files (not implemented)"); - throw new InternalError(); + System.err.println(""); + System.err.println("Received checksum:"); + for (byte b: hash512) { + System.err.print(String.format("%02X", b)); } + System.err.println(""); + throw new InternalError(); } - } catch (EmptyDirectory e) { - throw new ProtocolError(); - } - } while(!fileFullyWritten); - if (!Arrays.equals(hash512, computeHashsum(filename, HashAlgorithm.SHA512))) { - System.err.println("Error: Hashsum does not match"); - System.err.println("Computed checksum:"); - byte[] c = computeHashsum(filename, HashAlgorithm.SHA512); - for (byte b: c) { - System.err.print(String.format("%02X", b)); - } - System.err.println(""); - System.err.println("Received checksum:"); - for (byte b: hash512) { - System.err.print(String.format("%02X", b)); + } else { + throw new InternalError(); } - System.err.println(""); + } catch (InterruptedException e) { throw new InternalError(); } } diff --git a/src/clientP2P/ClientP2P.java b/src/clientP2P/ClientP2P.java index f5ab3db..606f2cd 100644 --- a/src/clientP2P/ClientP2P.java +++ b/src/clientP2P/ClientP2P.java @@ -35,7 +35,7 @@ public class ClientP2P { case "udp": case "2" : System.out.println("Starting with UDP"); - ClientManagementUDP cmudp = new ClientManagementUDP(c.directories.getDataHomeDirectory(), c.hostList); + ClientManagementUDP cmudp = new ClientManagementUDP(c.directories.getDataHomeDirectory(), c.hostList, c.directories.getDataHomeDirectory() + c.parts + "/"); t = new Thread(cmudp); break; case "TCP":