package clientP2P; import clientP2P.ClientDownloadPartTCP; import tools.HostItem; import java.util.List; import java.util.ArrayList; import java.util.Arrays; import java.util.Random; import remoteException.EmptyDirectory; import remoteException.EmptyFile; import remoteException.VersionRemoteError; import remoteException.ProtocolRemoteError; import remoteException.NotFound; import remoteException.InternalRemoteError; import protocolP2P.HashAlgorithm; import protocolP2P.HashResponse; import protocolP2P.HashRequest; import protocolP2P.ProtocolP2PPacketTCP; import protocolP2P.Payload; import exception.ProtocolError; import exception.InternalError; import exception.TransmissionError; import exception.SizeError; import exception.VersionError; import protocolP2P.FilePart; import protocolP2P.LoadRequest; import java.io.IOException; import java.nio.file.Files; import java.io.File; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.nio.file.StandardCopyOption; import exception.SocketClosed; /** Class to download file from tcp * @author Louis Royer * @author Flavien Haas * @author JS Auge * @version 1.0 */ public class ClientDownloadTCP implements Runnable { private List hostList; private String filename; private byte[] hash512; private List sockList = new ArrayList(); private List offsetsToAsk = new ArrayList(); private List offsetsPending = new ArrayList(); private boolean stop; private long size; private static final long MAX_PARTIAL_SIZE = 4096; private String partsSubdir; private String dirStorage; private boolean success = false; /** Constructor with parameters: filename, list of hosts, parts subdirectory and dirStorage * @param filename name of file to download * @param hostList list of servers * @param partsSubdir directory to store .part files * @param dirStorage directory to write assembled file */ public ClientDownloadTCP(String filename, List hostList, String partsSubdir, String dirStorage) { this.partsSubdir = partsSubdir; this.dirStorage = dirStorage; this.filename = filename; this.hostList = hostList; this.stop = false; } /** Asks thread to stop */ public void setStop() { stop = true; } /** Runnable implementation */ public void run() { try { init(); if (stop) { System.err.println("File is smaller than part max size."); hostList.get(0).closeTCPSocket(); } else { System.err.println("File is bigger than part max size."); purgeList(); initThreads(); while(!stop) { assignTasks(); checkTasksStatus(); } } System.err.println("Reassembling file parts."); reassembleFile(); } catch(InternalError e) { System.err.println("Error while downloading file. Aborting."); } finally { stopTasks(); } } /** Starts threads for each server in hostList. */ private void initThreads() { for(HostItem hostItem: hostList) { sockList.add(new ClientDownloadPartTCP(this, filename, hostItem.getTCPSocket(), partsSubdir)); } for(ClientDownloadPartTCP c: sockList) { Thread t = new Thread(c); t.start(); } System.err.println("Threads initialized"); } /** Remove tasks from failed threads. Update done status. * @throws InternalError */ private void checkTasksStatus() throws InternalError { try { synchronized(this) { this.wait(); List sockListCpy = new ArrayList<>(sockList); for(ClientDownloadPartTCP c: sockListCpy) { if (c.hasFailed() == true) { sockList.remove(c); offsetsPending.removeAll(c.getFailed()); offsetsToAsk.addAll(c.getFailed()); } try { offsetsPending.removeAll(c.getDone()); } catch (InterruptedException e) { throw new InternalError(); } } System.err.println("Task check status: " + offsetsToAsk.size() + " to asks, " + offsetsPending.size() + " pending"); if (offsetsToAsk.isEmpty() && offsetsPending.isEmpty()) { stop = true; } if (sockList.size() == 0) { System.err.println("No thread working"); throw new InternalError(); } } } catch (InterruptedException e) { throw new InternalError(); } } /** Assign tasks randomly to threads. * @throws InternalError */ private void assignTasks() throws InternalError { Random rand = new Random(); for(long offset : offsetsToAsk) { try { sockList.get(rand.nextInt(sockList.size())).assignTask(offset); offsetsPending.add(offset); System.err.println("Assigned task "+ offset); } catch(InterruptedException e) { throw new InternalError(); } } offsetsToAsk.removeAll(offsetsPending); } /** Stop threads */ private void stopTasks() { for(ClientDownloadPartTCP c : sockList) { try { c.setStop(); } catch (InterruptedException e) {} } } /** Get hashsum from server. * @param hostItem server to ask hash * @return hash512sum * @throws InternalError */ private byte[] getHashSum512(HostItem hostItem) throws InternalError { byte[] hash; HashAlgorithm[] hashesAlgo = new HashAlgorithm[1]; hashesAlgo[0] = HashAlgorithm.SHA512; ProtocolP2PPacketTCP d = new ProtocolP2PPacketTCP((Payload) new HashRequest(filename, hashesAlgo)); try { d.sendRequest((Object)hostItem.getTCPSocket()); try { Payload pHash = d.receiveResponse().getPayload(); assert pHash instanceof HashResponse : "This payload must be instance of HashResponse"; if (!(pHash instanceof HashResponse)) { throw new InternalError(); } else { hash = ((HashResponse)pHash).getHash(HashAlgorithm.SHA512); } } catch (EmptyDirectory e) { hash = new byte[0]; } catch (NotFound e) { hash = new byte[0]; // TODO: use more specific errors } catch (EmptyFile e) { throw new InternalError(); } catch (ProtocolError e) { throw new InternalError(); } catch (InternalRemoteError e) { throw new InternalError(); } catch (VersionRemoteError e) { throw new InternalError(); } catch (ProtocolRemoteError e) { throw new InternalError(); } catch (TransmissionError e) { throw new InternalError(); } catch (VersionError e) { throw new InternalError(); } catch (SizeError e) { throw new InternalError(); } return hash; } catch (IOException e) { throw new InternalError(); } catch (SocketClosed e){ System.err.println("getHashSum512 : SocketClosed"); throw new InternalError(); } } /** Removes servers not owning the correct file to download from list. * This is done by comparing hash512sum. * @throws InternalError */ private void purgeList() throws InternalError { List blackList = new ArrayList(); boolean first = false; byte[] hashsum; for(HostItem host: hostList) { // already have hashsum from 1st server if (!first) { first = true; continue; } // ask hashsum hashsum = getHashSum512(host); if (!Arrays.equals(hash512, hashsum)) { blackList.add(host); } } // purge list for(HostItem host: blackList) { hostList.remove(host); } System.err.println("Host list purge: done"); } /** Getter for hash512sum * @return hash512sum */ public byte[] getHashSum512() { return hash512; } /** Initialize infos about file to download (size, hash512sum, partslist to dl). * Also download first partfile (to get size). * @throws InternalError */ private void init() throws InternalError { // get size setSize(); // get hashsum from 1st server in list hash512 = getHashSum512(hostList.get(0)); if (hash512.length == 0) { System.err.println("Error: no hash512sum support."); throw new InternalError(); } // Add tasks if (!stop) { for(long i=MAX_PARTIAL_SIZE; i= size) { success = true; System.err.println("Reassembling: success"); } else { // append to file try { Files.write(new File(dirStorage + filename).toPath(), Files.readAllBytes(new File(partsSubdir + filename + "_" + nextOffset + ".part").toPath()), StandardOpenOption.APPEND); nextOffset = (new File(dirStorage + filename)).length(); } catch (IOException e) { abort = true; System.err.println("Aborting: bad number " + nextOffset); } } } while((!success) && (!abort)); } }