FileMessageFactory.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.ha.deploy;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.buf.HexUtils;
import org.apache.tomcat.util.res.StringManager;
/**
* This factory is used to read files and write files by splitting them up into smaller messages. So that entire files
* don't have to be read into memory. <BR>
* The factory can be used as a reader or writer but not both at the same time. When done reading or writing the factory
* will close the input or output streams and mark the factory as closed. It is not possible to use it after that. <BR>
* To force a cleanup, call cleanup() from the calling object. <BR>
* This class is not thread safe.
*/
public class FileMessageFactory {
/*--Static Variables----------------------------------------*/
private static final Log log = LogFactory.getLog(FileMessageFactory.class);
private static final StringManager sm = StringManager.getManager(FileMessageFactory.class);
/**
* The number of bytes that we read from file
*/
public static final int READ_SIZE = 1024 * 10; // 10 KiB
/**
* The file that we are reading/writing
*/
protected final File file;
/**
* True means that we are writing with this factory. False means that we are reading with this factory
*/
protected final boolean openForWrite;
/**
* Once the factory is used, it cannot be reused.
*/
protected boolean closed = false;
/**
* When openForWrite=false, the input stream is held by this variable
*/
protected FileInputStream in;
/**
* When openForWrite=true, the output stream is held by this variable
*/
protected FileOutputStream out;
/**
* The number of messages we have written
*/
protected int nrOfMessagesProcessed = 0;
/**
* The total size of the file
*/
protected long size = 0;
/**
* The total number of packets that we split this file into
*/
protected long totalNrOfMessages = 0;
/**
* The number of the last message processed. Message IDs are 1 based.
*/
protected AtomicLong lastMessageProcessed = new AtomicLong(0);
/**
* Messages received out of order are held in the buffer until required. If everything is worked as expected,
* messages will spend very little time in the buffer.
*/
protected final Map<Long,FileMessage> msgBuffer = new ConcurrentHashMap<>();
/**
* The bytes that we hold the data in, not thread safe.
*/
protected byte[] data = new byte[READ_SIZE];
/**
* Flag that indicates if a thread is writing messages to disk. Access to this flag must be synchronised.
*/
protected boolean isWriting = false;
/**
* The time this instance was created. (in milliseconds)
*
* @deprecated Unused. This will be removed in Tomcat 11.
*/
@Deprecated
protected long creationTime = 0;
/**
* The time this instance was last modified.
*/
protected long lastModified = 0;
/**
* The maximum time (in seconds) this instance will be allowed to exist from lastModifiedTime.
*/
protected int maxValidTime = -1;
/**
* Private constructor, either instantiates a factory to read or write. <BR>
* When openForWrite==true, then a the file, f, will be created and an output stream is opened to write to it. <BR>
* When openForWrite==false, an input stream is opened, the file has to exist.
*
* @param f File - the file to be read/written
* @param openForWrite boolean - true means we are writing to the file, false means we are reading from the file
*
* @throws FileNotFoundException - if the file to be read doesn't exist
* @throws IOException - if the system fails to open input/output streams to the file or if it fails to
* create the file to be written to.
*/
private FileMessageFactory(File f, boolean openForWrite) throws FileNotFoundException, IOException {
this.file = f;
this.openForWrite = openForWrite;
if (log.isTraceEnabled()) {
log.trace("FileMessageFactory open file " + f + " write " + openForWrite);
}
if (openForWrite) {
if (!file.exists()) {
if (!file.createNewFile()) {
throw new IOException(sm.getString("fileNewFail", file));
}
}
out = new FileOutputStream(f);
} else {
size = file.length();
totalNrOfMessages = (size / READ_SIZE) + 1;
in = new FileInputStream(f);
} // end if
creationTime = System.currentTimeMillis();
lastModified = System.currentTimeMillis();
}
/**
* Creates a factory to read or write from a file. When opening for read, the readMessage can be invoked, and when
* opening for write the writeMessage can be invoked.
*
* @param f File - the file to be read or written
* @param openForWrite boolean - true, means we are writing to the file, false means we are reading from it
*
* @throws FileNotFoundException - if the file to be read doesn't exist
* @throws IOException - if it fails to create the file that is to be written
*
* @return FileMessageFactory
*/
public static FileMessageFactory getInstance(File f, boolean openForWrite)
throws FileNotFoundException, IOException {
return new FileMessageFactory(f, openForWrite);
}
/**
* Reads file data into the file message and sets the size, totalLength, totalNrOfMsgs and the message number <BR>
* If EOF is reached, the factory returns null, and closes itself, otherwise the same message is returned as was
* passed in. This makes sure that not more memory is ever used. To remember, neither the file message or the
* factory are thread safe. Don't hand off the message to one thread and read the same with another.
*
* @param f FileMessage - the message to be populated with file data
*
* @throws IllegalArgumentException - if the factory is for writing or is closed
* @throws IOException - if a file read exception occurs
*
* @return FileMessage - returns the same message passed in as a parameter, or null if EOF
*/
public FileMessage readMessage(FileMessage f) throws IllegalArgumentException, IOException {
checkState(false);
int length = in.read(data);
if (length == -1) {
cleanup();
return null;
} else {
f.setData(data, length);
f.setTotalNrOfMsgs(totalNrOfMessages);
f.setMessageNumber(++nrOfMessagesProcessed);
return f;
} // end if
}
/**
* Writes a message to file. If (msg.getMessageNumber() == msg.getTotalNrOfMsgs()) the output stream will be closed
* after writing.
*
* @param msg FileMessage - message containing data to be written
*
* @throws IllegalArgumentException - if the factory is opened for read or closed
* @throws IOException - if a file write error occurs
*
* @return returns true if the file is complete and outputstream is closed, false otherwise.
*/
public boolean writeMessage(FileMessage msg) throws IllegalArgumentException, IOException {
if (!openForWrite) {
throw new IllegalArgumentException(sm.getString("fileMessageFactory.cannotWrite"));
}
if (log.isTraceEnabled()) {
log.trace("Message " + msg + " data " + HexUtils.toHexString(msg.getData()) + " data length " +
msg.getDataLength() + " out " + out);
}
if (msg.getMessageNumber() <= lastMessageProcessed.get()) {
// Duplicate of message already processed
log.warn(sm.getString("fileMessageFactory.duplicateMessage", msg.getContextName(), msg.getFileName(),
HexUtils.toHexString(msg.getData()), Integer.valueOf(msg.getDataLength())));
return false;
}
FileMessage previous = msgBuffer.put(Long.valueOf(msg.getMessageNumber()), msg);
if (previous != null) {
// Duplicate of message not yet processed
log.warn(sm.getString("fileMessageFactory.duplicateMessage", msg.getContextName(), msg.getFileName(),
HexUtils.toHexString(msg.getData()), Integer.valueOf(msg.getDataLength())));
return false;
}
// Have received a new message. Update the last modified time (even if the message is being buffered for now).
lastModified = System.currentTimeMillis();
FileMessage next = null;
synchronized (this) {
if (!isWriting) {
next = msgBuffer.get(Long.valueOf(lastMessageProcessed.get() + 1));
if (next != null) {
isWriting = true;
} else {
return false;
}
} else {
return false;
}
}
while (next != null) {
out.write(next.getData(), 0, next.getDataLength());
lastMessageProcessed.incrementAndGet();
out.flush();
if (next.getMessageNumber() == next.getTotalNrOfMsgs()) {
out.close();
cleanup();
return true;
}
synchronized (this) {
next = msgBuffer.get(Long.valueOf(lastMessageProcessed.get() + 1));
if (next == null) {
isWriting = false;
}
}
}
return false;
}// writeMessage
/**
* Closes the factory, its streams and sets all its references to null
*/
public void cleanup() {
if (in != null) {
try {
in.close();
} catch (IOException ignore) {
}
}
if (out != null) {
try {
out.close();
} catch (IOException ignore) {
}
}
in = null;
out = null;
size = 0;
closed = true;
data = null;
nrOfMessagesProcessed = 0;
totalNrOfMessages = 0;
msgBuffer.clear();
lastMessageProcessed = null;
}
/**
* Check to make sure the factory is able to perform the function it is asked to do. Invoked by
* readMessage/writeMessage before those methods proceed.
*
* @param openForWrite The value to check
*
* @throws IllegalArgumentException if the state is not the expected one
*/
protected void checkState(boolean openForWrite) throws IllegalArgumentException {
if (this.openForWrite != openForWrite) {
cleanup();
if (openForWrite) {
throw new IllegalArgumentException(sm.getString("fileMessageFactory.cannotWrite"));
} else {
throw new IllegalArgumentException(sm.getString("fileMessageFactory.cannotRead"));
}
}
if (this.closed) {
cleanup();
throw new IllegalArgumentException(sm.getString("fileMessageFactory.closed"));
}
}
/**
* Example usage.
*
* @param args String[], args[0] - read from filename, args[1] write to filename
*
* @throws Exception An error occurred
*
* @deprecated This method will be removed in Tomcat 10.0.x
*/
@Deprecated
public static void main(String[] args) throws Exception {
System.out.println("Usage: FileMessageFactory fileToBeRead fileToBeWritten");
System.out.println("Usage: This will make a copy of the file on the local file system");
FileMessageFactory read = getInstance(new File(args[0]), false);
FileMessageFactory write = getInstance(new File(args[1]), true);
FileMessage msg = new FileMessage(null, args[0], args[0]);
msg = read.readMessage(msg);
if (msg == null) {
System.out.println("Empty input file : " + args[0]);
return;
}
System.out.println("Expecting to write " + msg.getTotalNrOfMsgs() + " messages.");
int cnt = 0;
while (msg != null) {
write.writeMessage(msg);
cnt++;
msg = read.readMessage(msg);
} // while
System.out.println("Actually wrote " + cnt + " messages.");
}/// main
public File getFile() {
return file;
}
public boolean isValid() {
if (maxValidTime > 0) {
long timeNow = System.currentTimeMillis();
long timeIdle = (timeNow - lastModified) / 1000L;
if (timeIdle > maxValidTime) {
cleanup();
if (file.exists()) {
if (file.delete()) {
log.warn(sm.getString("fileMessageFactory.delete", file, Long.toString(maxValidTime)));
} else {
log.warn(sm.getString("fileMessageFactory.deleteFail", file));
}
}
return false;
}
}
return true;
}
public int getMaxValidTime() {
return maxValidTime;
}
public void setMaxValidTime(int maxValidTime) {
this.maxValidTime = maxValidTime;
}
}