package clientP2P; import java.util.Map; import java.util.HashMap; import java.util.List; import java.util.ArrayList; import java.util.Arrays; import java.util.Random; import java.io.IOException; import java.io.File; import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.nio.file.StandardCopyOption; import exception.LocalException; import exception.RemoteException; import localException.SocketClosed; import localException.ProtocolError; import localException.InternalError; import localException.TransmissionError; import localException.SizeError; import localException.VersionError; import remoteException.EmptyDirectory; import remoteException.EmptyFile; import remoteException.VersionRemoteError; import remoteException.ProtocolRemoteError; import remoteException.NotFound; import remoteException.InternalRemoteError; import remoteException.NotATracker; import remoteException.UnknownHost; import protocolP2P.HashAlgorithm; import protocolP2P.HashResponse; import protocolP2P.HashRequest; import protocolP2P.Payload; import protocolP2P.FilePart; import protocolP2P.SizeRequest; import protocolP2P.SizeResponse; import protocolP2P.ProtocolP2PPacket; import protocolP2P.UpdateRatio; import clientP2P.ClientDownloadPart; import tools.HostItem; import tools.Logger; import tools.LogLevel; import tools.ServeErrors; /** Class to download file * @author Louis Royer * @author Flavien Haas * @author JS Auge * @version 1.0 */ public abstract class ClientDownload extends ServeErrors implements Runnable { protected List hostList; protected String filename; protected byte[] hash512; protected List sockList = new ArrayList(); protected Map ratioUpdater = new HashMap<>(); protected List offsetsToAsk = new ArrayList(); protected List offsetsPending = new ArrayList(); protected boolean stop; protected long size; protected static final long MAX_PARTIAL_SIZE = 4096; protected String partsSubdir; protected String dirStorage; protected boolean success = false; protected Logger logger; protected HostItem client; protected HostItem tracker; /** Constructor with parameters: filename, list of hosts, parts subdirectory and dirStorage * @param filename name of file to download * @param hostList list of servers * @param partsSubdir directory to store .part files * @param dirStorage directory to write assembled file * @param logger Logger * @param client HostItem of the application * @param tracker HostItem of the tracker */ public ClientDownload(String filename, List hostList, String partsSubdir, String dirStorage, Logger logger, HostItem client, HostItem tracker) { this.partsSubdir = partsSubdir; this.dirStorage = dirStorage; this.filename = filename; this.hostList = hostList; this.logger = logger; this.client = client; this.tracker = tracker; this.stop = false; } /** Success getter. * @return true when file have successfully been reassembled. */ public boolean getSuccess() { return success; } /** Getter for hash512sum * @return hash512sum */ public byte[] getHashSum512() { return hash512; } /** Stop threads */ protected void stopTasks() { for(ClientDownloadPart c : sockList) { try { c.setStop(); } catch (InterruptedException e) { writeLog(e, LogLevel.Error); } } } /** Asks thread to stop */ public void setStop() { stop = true; } /** Assign tasks randomly to threads. * @throws InternalError */ protected void assignTasks() throws InternalError { Random rand = new Random(); for(long offset : offsetsToAsk) { try { sockList.get(rand.nextInt(sockList.size())).assignTask(offset); offsetsPending.add(offset); System.err.println("Assigned task "+ offset); writeLog("Assigned task "+ offset, LogLevel.Info); } catch(InterruptedException e) { writeLog(e, LogLevel.Error); throw new InternalError(); } } offsetsToAsk.removeAll(offsetsPending); } /** Create a clientDownloadPart * @param hostItem Hostitem of the server */ protected abstract ClientDownloadPart createDownloadPart(HostItem hostItem); /** Starts threads for each server in hostList. */ protected void initThreads() { for(HostItem hostItem: hostList) { sockList.add(createDownloadPart(hostItem)); } for(ClientDownloadPart c: sockList) { Thread t = new Thread(c); t.start(); } writeLog("Threads initialized", LogLevel.Info); } /** Remove tasks from failed threads. Update done status. * @throws InternalError */ protected void checkTasksStatus() throws InternalError { try { synchronized(this) { this.wait(); List sockListCpy = new ArrayList<>(sockList); for(ClientDownloadPart c: sockListCpy) { if (c.hasFailed() == true) { sockList.remove(c); offsetsPending.removeAll(c.getFailed()); offsetsToAsk.addAll(c.getFailed()); } try { offsetsPending.removeAll(c.getDone()); } catch (InterruptedException e) { throw new InternalError(); } ratioUpdater.put(c.getServer(), c.getReceivedBytesCount()); } writeLog("Task check status: " + offsetsToAsk.size() + " to asks, " + offsetsPending.size() + " pending", LogLevel.Info); if (offsetsToAsk.isEmpty() && offsetsPending.isEmpty()) { stop = true; } if (sockList.size() == 0) { logger.writeUDP("No thread working", LogLevel.Error); throw new InternalError(); } } } catch (InterruptedException e) { throw new InternalError(); } } /** Send Ratio update to the tracker */ public void sendRatioUpdate() { for(HostItem server: ratioUpdater.keySet()) { Long r = ratioUpdater.get(server); if (r != null) { long rl = r.longValue(); if (rl != 0) { try { ProtocolP2PPacket d = createProtocolP2PPacket(new UpdateRatio(client, server, rl)); d.sendRequest(getHostItemSocket(tracker)); } catch (Exception e) { writeLog(e, LogLevel.Error); } } } } } /** Get hashsum from server. * @param hostItem server to ask hash * @return hash512sum * @throws InternalError */ protected byte[] getHashSum512(HostItem hostItem) throws InternalError { byte[] hash; HashAlgorithm[] hashesAlgo = new HashAlgorithm[1]; hashesAlgo[0] = HashAlgorithm.SHA512; ProtocolP2PPacket d = createProtocolP2PPacket(new HashRequest(filename, hashesAlgo)); try { d.sendRequest(getHostItemSocket(hostItem)); try { Payload pHash = d.receiveResponse().getPayload(); assert pHash instanceof HashResponse : "This payload must be instance of HashResponse"; if (!(pHash instanceof HashResponse)) { throw new InternalError(); } else { hash = ((HashResponse)pHash).getHash(HashAlgorithm.SHA512); } } catch (EmptyDirectory e) { writeLog(e, LogLevel.Error); hash = new byte[0]; } catch (NotFound e) { writeLog(e, LogLevel.Error); hash = new byte[0]; } catch (LocalException e) { writeLog(e, LogLevel.Error); throw new InternalError(); } catch (RemoteException e) { writeLog(e, LogLevel.Error); throw new InternalError(); } return hash; } catch (IOException e) { writeLog(e, LogLevel.Error); throw new InternalError(); } catch (SocketClosed e){ System.err.println("getHashSum512 : SocketClosed"); writeLog("getHashSum512 : SocketClosed", LogLevel.Error); throw new InternalError(); } } /** Removes servers not owning the correct file to download from list. * This is done by comparing hash512sum. * @throws InternalError */ protected void purgeList() throws InternalError { List blackList = new ArrayList(); boolean first = false; byte[] hashsum; for(HostItem host: hostList) { // already have hashsum from 1st server if (!first) { first = true; continue; } // ask hashsum hashsum = getHashSum512(host); if (!Arrays.equals(hash512, hashsum)) { blackList.add(host); } } // purge list for(HostItem host: blackList) { hostList.remove(host); } writeLog("Host list purge: done", LogLevel.Info); } /** Reassemble file from file parts. * Set success to true if file is reassembled successfully. */ protected void reassembleFile() { boolean firstPart = true; boolean abort = false; long nextOffset = 0; do { if (firstPart) { writeLog("Reassembling: First part", LogLevel.Info); try { // create file Files.copy(new File(partsSubdir + filename + "_" + nextOffset + ".part").toPath(), new File(dirStorage + filename).toPath(), StandardCopyOption.REPLACE_EXISTING); nextOffset = (new File(dirStorage + filename)).length(); firstPart = false; } catch (IOException e) { writeLog("Reassembling: aborting on first part", LogLevel.Warning); abort = true; } } else if (nextOffset >= size) { success = true; writeLog("Reassembling: success", LogLevel.Info); } else { // append to file try { Files.write(new File(dirStorage + filename).toPath(), Files.readAllBytes(new File(partsSubdir + filename + "_" + nextOffset + ".part").toPath()), StandardOpenOption.APPEND); nextOffset = (new File(dirStorage + filename)).length(); } catch (IOException e) { abort = true; writeLog("Aborting: bad number " + nextOffset, LogLevel.Error); } } } while((!success) && (!abort)); } /** Set size of file to download. Also download first file part. * @throws InternalError */ protected void setSize() throws InternalError { ProtocolP2PPacket d = createProtocolP2PPacket(new SizeRequest(filename)); try { d.sendRequest(getHostItemSocket(hostList.get(0))); try { Payload p = d.receiveResponse().getPayload(); assert p instanceof SizeResponse : "This payload must be instance of SizeResponse"; if (!(p instanceof SizeResponse)) { writeLog("cannot get size.", LogLevel.Error); throw new InternalError(); } else { SizeResponse fp = (SizeResponse)p; if (!fp.getFilename().equals(filename)) { writeLog("wrong file size received: `" + fp.getFilename() + "`", LogLevel.Error); throw new ProtocolError(); } size = fp.getTotalSize(); } } catch (EmptyDirectory e) { System.err.println("Error: empty directory."); writeLog("empty directory.", LogLevel.Error); throw new InternalError(); } catch (LocalException e) { writeLog(e, LogLevel.Error); throw new InternalError(); } catch (RemoteException e) { writeLog(e, LogLevel.Error); throw new InternalError(); } } catch (IOException e) { writeLog(e, LogLevel.Error); throw new InternalError(); } catch (SocketClosed e){ System.err.println("setSize : SocketClosed"); writeLog("setSize : SocketClosed", LogLevel.Error); } } /** Close HostItem socket * @param hostItem HostItem */ protected abstract void closeHostItemSocket(HostItem hostItem); /** Runnable implementation */ public void run() { try { init(); if (stop) { writeLog("File is smaller than part max size.", LogLevel.Info); closeHostItemSocket(hostList.get(0)); } else { writeLog("File is bigger than part max size.", LogLevel.Info); purgeList(); initThreads(); while(!stop) { assignTasks(); checkTasksStatus(); } } writeLog("Reassembling file parts.", LogLevel.Info); reassembleFile(); } catch(InternalError e) { writeLog("Error while downloading file. Aborting.", LogLevel.Error); } finally { stopTasks(); } } /** Initialize infos about file to download (size, hash512sum, partslist to dl). * Also download first partfile (to get size). * @throws InternalError */ protected void init() throws InternalError { // get size setSize(); // get hashsum from 1st server in list hash512 = getHashSum512(hostList.get(0)); if (hash512.length == 0) { writeLog("no hash512sum support.", LogLevel.Error); throw new InternalError(); } // Add tasks if (!stop) { for(long i=0; i