Class FileSystem
- java.lang.Object
-
- org.apache.flink.core.fs.FileSystem
-
- All Implemented Interfaces:
IFileSystem
- Direct Known Subclasses:
ForStFlinkFileSystem
,HadoopFileSystem
,LimitedConnectionsFileSystem
,LocalFileSystem
,SafetyNetWrapperFileSystem
@Public public abstract class FileSystem extends Object implements IFileSystem
Abstract base class of all file systems used by Flink. This class may be extended to implement distributed file systems, or local file systems. The abstraction by this file system is very simple, and the set of available operations quite limited, to support the common denominator of a wide range of file systems. For example, appending to or mutating existing files is not supported.Flink implements and supports some file system types directly (for example the default machine-local file system). Other file system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for example HDFS).
Scope and Purpose
The purpose of this abstraction is used to expose a common and well defined interface for access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing state and recovery data) and by reusable built-in connectors (file sources / sinks).
The purpose of this abstraction is not to give user programs an abstraction with extreme flexibility and control across all possible file systems. That mission would be a folly, as the differences in characteristics of even the most common file systems are already quite large. It is expected that user programs that need specialized functionality of certain file systems in their functions, operations, sources, or sinks instantiate the specialized file system adapters directly.
Data Persistence Contract
The FileSystem's
output streams
are used to persistently store data, both for results of streaming applications and for fault tolerance and recovery. It is therefore crucial that the persistence semantics of these streams are well defined.Definition of Persistence Guarantees
Data written to an output stream is considered persistent, if two requirements are met:
- Visibility Requirement: It must be guaranteed that all other processes, machines, virtual machines, containers, etc. that are able to access the file see the data consistently when given the absolute file path. This requirement is similar to the close-to-open semantics defined by POSIX, but restricted to the file itself (by its absolute path).
- Durability Requirement: The file system's specific durability/persistence
requirements must be met. These are specific to the particular file system. For example the
LocalFileSystem
does not provide any durability guarantees for crashes of both hardware and operating system, while replicated distributed file systems (like HDFS) typically guarantee durability in the presence of at most n concurrent node failures, where n is the replication factor.
Updates to the file's parent directory (such that the file shows up when listing the directory contents) are not required to be complete for the data in the file stream to be considered persistent. This relaxation is important for file systems where updates to directory contents are only eventually consistent.
The
FSDataOutputStream
has to guarantee data persistence for the written bytes once the call toFSDataOutputStream.close()
returns.Examples
Fault-tolerant distributed file systems
For fault-tolerant distributed file systems, data is considered persistent once it has been received and acknowledged by the file system, typically by having been replicated to a quorum of machines (durability requirement). In addition the absolute file path must be visible to all other machines that will potentially access the file (visibility requirement).
Whether data has hit non-volatile storage on the storage nodes depends on the specific guarantees of the particular file system.
The metadata updates to the file's parent directory are not required to have reached a consistent state. It is permissible that some machines see the file when listing the parent directory's contents while others do not, as long as access to the file by its absolute path is possible on all nodes.
Local file systems
A local file system must support the POSIX close-to-open semantics. Because the local file system does not have any fault tolerance guarantees, no further requirements exist.
The above implies specifically that data may still be in the OS cache when considered persistent from the local file system's perspective. Crashes that cause the OS cache to lose data are considered fatal to the local machine and are not covered by the local file system's guarantees as defined by Flink.
That means that computed results, checkpoints, and savepoints that are written only to the local filesystem are not guaranteed to be recoverable from the local machine's failure, making local file systems unsuitable for production setups.
Updating File Contents
Many file systems either do not support overwriting contents of existing files at all, or do not support consistent visibility of the updated contents in that case. For that reason, Flink's FileSystem does not support appending to existing files, or seeking within output streams so that previously written data could be overwritten.
Overwriting Files
Overwriting files is in general possible. A file is overwritten by deleting it and creating a new file. However, certain filesystems cannot make that change synchronously visible to all parties that have access to the file. For example Amazon S3 guarantees only eventual consistency in the visibility of the file replacement: Some machines may see the old file, some machines may see the new file.
To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to the same file path more than once.
Thread Safety
Implementations of
FileSystem
must be thread-safe: The same instance of FileSystem is frequently shared across multiple threads in Flink and must be able to concurrently create input/output streams and list file metadata.The
FSDataInputStream
andFSDataOutputStream
implementations are strictly not thread-safe. Instances of the streams should also not be passed between threads in between read or write operations, because there are no guarantees about the visibility of operations across threads (many operations do not create memory fences).Streams Safety Net
When application code obtains a FileSystem (via
get(URI)
or viaPath.getFileSystem()
), the FileSystem instantiates a safety net for that FileSystem. The safety net ensures that all streams created from the FileSystem are closed when the application task finishes (or is canceled or failed). That way, the task's threads do not leak connections.Internal runtime code can explicitly obtain a FileSystem that does not use the safety net via
getUnguardedFileSystem(URI)
.- See Also:
FSDataInputStream
,FSDataOutputStream
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
FileSystem.FSKey
An identifier of a file system, via its scheme and its authority.static class
FileSystem.WriteMode
The possible write modes.
-
Constructor Summary
Constructors Constructor Description FileSystem()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Deprecated Methods Modifier and Type Method Description FSDataOutputStream
create(Path f, boolean overwrite)
Deprecated.Usecreate(Path, FileSystem.WriteMode)
instead.FSDataOutputStream
create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
Deprecated.Deprecated because not well supported across types of file systems.abstract FSDataOutputStream
create(Path f, FileSystem.WriteMode overwriteMode)
Opens an FSDataOutputStream to a new file at the given path.RecoverableWriter
createRecoverableWriter()
Creates a newRecoverableWriter
.RecoverableWriter
createRecoverableWriter(Map<String,String> conf)
Creates a newRecoverableWriter
.abstract boolean
delete(Path f, boolean recursive)
Delete a file.boolean
exists(Path f)
Check if exists.static FileSystem
get(URI uri)
Returns a reference to theFileSystem
instance for accessing the file system identified by the givenURI
.long
getDefaultBlockSize()
Deprecated.This value is no longer used and is meaningless.static URI
getDefaultFsUri()
Gets the default file system URI that is used for paths and file systems that do not specify and explicit scheme.abstract BlockLocation[]
getFileBlockLocations(FileStatus file, long start, long len)
Return an array containing hostnames, offset and size of portions of the given file.abstract FileStatus
getFileStatus(Path f)
Return a file status object that represents the path.abstract Path
getHomeDirectory()
Returns the path of the user's home directory in this file system.static FileSystem
getLocalFileSystem()
Returns a reference to theFileSystem
instance for accessing the local file system.static FileSystem
getUnguardedFileSystem(URI fsUri)
abstract URI
getUri()
Returns a URI whose scheme and authority identify this file system.abstract Path
getWorkingDirectory()
Returns the path of the file system's current working directory.static void
initialize(Configuration config)
Deprecated.useinitialize(Configuration, PluginManager)
instead.static void
initialize(Configuration config, PluginManager pluginManager)
Initializes the shared file system settings.boolean
initOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory)
Initializes output directories on distributed file systems according to the given write mode.boolean
initOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory)
Initializes output directories on local file systems according to the given write mode.abstract boolean
isDistributedFS()
Returns true if this is a distributed file system.abstract FileStatus[]
listStatus(Path f)
List the statuses of the files/directories in the given path if the path is a directory.abstract boolean
mkdirs(Path f)
Make the given file and all non-existent parents into directories.abstract FSDataInputStream
open(Path f)
Opens an FSDataInputStream at the indicated Path.abstract FSDataInputStream
open(Path f, int bufferSize)
Opens an FSDataInputStream at the indicated Path.abstract boolean
rename(Path src, Path dst)
Renames the file/directory src to dst.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.core.fs.IFileSystem
canCopyPaths
-
-
-
-
Method Detail
-
initialize
@Deprecated public static void initialize(Configuration config) throws IllegalConfigurationException
Deprecated.useinitialize(Configuration, PluginManager)
instead.Initializes the shared file system settings.The given configuration is passed to each file system factory to initialize the respective file systems. Because the configuration of file systems may be different subsequent to the call of this method, this method clears the file system instance cache.
This method also reads the default file system URI from the configuration key
CoreOptions.DEFAULT_FILESYSTEM_SCHEME
. All calls toget(URI)
where the URI has no scheme will be interpreted as relative to that URI. As an example, assume the default file system URI is set to'hdfs://localhost:9000/'
. A file path of'/user/USERNAME/in.txt'
is interpreted as'hdfs://localhost:9000/user/USERNAME/in.txt'
.- Parameters:
config
- the configuration from where to fetch the parameter.- Throws:
IllegalConfigurationException
-
initialize
public static void initialize(Configuration config, @Nullable PluginManager pluginManager) throws IllegalConfigurationException
Initializes the shared file system settings.The given configuration is passed to each file system factory to initialize the respective file systems. Because the configuration of file systems may be different subsequent to the call of this method, this method clears the file system instance cache.
This method also reads the default file system URI from the configuration key
CoreOptions.DEFAULT_FILESYSTEM_SCHEME
. All calls toget(URI)
where the URI has no scheme will be interpreted as relative to that URI. As an example, assume the default file system URI is set to'hdfs://localhost:9000/'
. A file path of'/user/USERNAME/in.txt'
is interpreted as'hdfs://localhost:9000/user/USERNAME/in.txt'
.- Parameters:
config
- the configuration from where to fetch the parameter.pluginManager
- optional plugin manager that is used to initialized filesystems provided as plugins.- Throws:
IllegalConfigurationException
-
getLocalFileSystem
public static FileSystem getLocalFileSystem()
Returns a reference to theFileSystem
instance for accessing the local file system.- Returns:
- a reference to the
FileSystem
instance for accessing the local file system.
-
get
public static FileSystem get(URI uri) throws IOException
Returns a reference to theFileSystem
instance for accessing the file system identified by the givenURI
.- Parameters:
uri
- theURI
identifying the file system- Returns:
- a reference to the
FileSystem
instance for accessing the file system identified by the givenURI
. - Throws:
IOException
- thrown if a reference to the file system instance could not be obtained
-
getUnguardedFileSystem
@Internal public static FileSystem getUnguardedFileSystem(URI fsUri) throws IOException
- Throws:
IOException
-
getDefaultFsUri
public static URI getDefaultFsUri()
Gets the default file system URI that is used for paths and file systems that do not specify and explicit scheme.As an example, assume the default file system URI is set to
'hdfs://someserver:9000/'
. A file path of'/user/USERNAME/in.txt'
is interpreted as'hdfs://someserver:9000/user/USERNAME/in.txt'
.- Returns:
- The default file system URI
-
create
@Deprecated public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException
Deprecated.Deprecated because not well supported across types of file systems. Control the behavior of specific file systems via configurations instead.Opens an FSDataOutputStream at the indicated Path.This method is deprecated, because most of its parameters are ignored by most file systems. To control for example the replication factor and block size in the Hadoop Distributed File system, make sure that the respective Hadoop configuration file is either linked from the Flink configuration, or in the classpath of either Flink or the user code.
- Parameters:
f
- the file name to openoverwrite
- if a file with this name already exists, then if true, the file will be overwritten, and if false an error will be thrown.bufferSize
- the size of the buffer to be used.replication
- required block replication for the file.blockSize
- the size of the file blocks- Throws:
IOException
- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
create
@Deprecated public FSDataOutputStream create(Path f, boolean overwrite) throws IOException
Deprecated.Usecreate(Path, FileSystem.WriteMode)
instead.Opens an FSDataOutputStream at the indicated Path.- Parameters:
f
- the file name to openoverwrite
- if a file with this name already exists, then if true, the file will be overwritten, and if false an error will be thrown.- Throws:
IOException
- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
getDefaultBlockSize
@Deprecated public long getDefaultBlockSize()
Deprecated.This value is no longer used and is meaningless.Return the number of bytes that large input files should be optimally be split into to minimize I/O time.- Returns:
- the number of bytes that large input files should be optimally be split into to minimize I/O time
-
getWorkingDirectory
public abstract Path getWorkingDirectory()
Description copied from interface:IFileSystem
Returns the path of the file system's current working directory.- Specified by:
getWorkingDirectory
in interfaceIFileSystem
- Returns:
- the path of the file system's current working directory
-
getHomeDirectory
public abstract Path getHomeDirectory()
Description copied from interface:IFileSystem
Returns the path of the user's home directory in this file system.- Specified by:
getHomeDirectory
in interfaceIFileSystem
- Returns:
- the path of the user's home directory in this file system.
-
getUri
public abstract URI getUri()
Description copied from interface:IFileSystem
Returns a URI whose scheme and authority identify this file system.- Specified by:
getUri
in interfaceIFileSystem
- Returns:
- a URI whose scheme and authority identify this file system
-
getFileStatus
public abstract FileStatus getFileStatus(Path f) throws IOException
Description copied from interface:IFileSystem
Return a file status object that represents the path.- Specified by:
getFileStatus
in interfaceIFileSystem
- Parameters:
f
- The path we want information from- Returns:
- a FileStatus object
- Throws:
FileNotFoundException
- when the path does not exist; IOException see specific implementationIOException
-
getFileBlockLocations
public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException
Description copied from interface:IFileSystem
Return an array containing hostnames, offset and size of portions of the given file. For a nonexistent file or regions, null will be returned. This call is most helpful with DFS, where it returns hostnames of machines that contain the given file. The FileSystem will simply return an elt containing 'localhost'.- Specified by:
getFileBlockLocations
in interfaceIFileSystem
- Throws:
IOException
-
open
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
Description copied from interface:IFileSystem
Opens an FSDataInputStream at the indicated Path.- Specified by:
open
in interfaceIFileSystem
- Parameters:
f
- the file name to openbufferSize
- the size of the buffer to be used.- Throws:
IOException
-
open
public abstract FSDataInputStream open(Path f) throws IOException
Description copied from interface:IFileSystem
Opens an FSDataInputStream at the indicated Path.- Specified by:
open
in interfaceIFileSystem
- Parameters:
f
- the file to open- Throws:
IOException
-
createRecoverableWriter
public RecoverableWriter createRecoverableWriter() throws IOException
Description copied from interface:IFileSystem
Creates a newRecoverableWriter
. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.The returned object can act as a shared factory to open and recover multiple streams.
This method is optional on file systems and various file system implementations may not support this method, throwing an
UnsupportedOperationException
.- Specified by:
createRecoverableWriter
in interfaceIFileSystem
- Returns:
- A RecoverableWriter for this file system.
- Throws:
IOException
- Thrown, if the recoverable writer cannot be instantiated.
-
createRecoverableWriter
@PublicEvolving public RecoverableWriter createRecoverableWriter(Map<String,String> conf) throws IOException
Description copied from interface:IFileSystem
Creates a newRecoverableWriter
. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.The returned object can act as a shared factory to open and recover multiple streams.
This method is optional on file systems and various file system implementations may not support this method, throwing an
UnsupportedOperationException
.- Specified by:
createRecoverableWriter
in interfaceIFileSystem
- Parameters:
conf
- Map contains a flag to indicate whether the writer should not write to local storage. and can provide more information to instantiate the writer.- Returns:
- A RecoverableWriter for this file system.
- Throws:
IOException
- Thrown, if the recoverable writer cannot be instantiated.
-
listStatus
public abstract FileStatus[] listStatus(Path f) throws IOException
Description copied from interface:IFileSystem
List the statuses of the files/directories in the given path if the path is a directory.- Specified by:
listStatus
in interfaceIFileSystem
- Parameters:
f
- given path- Returns:
- the statuses of the files/directories in the given path
- Throws:
IOException
-
exists
public boolean exists(Path f) throws IOException
Description copied from interface:IFileSystem
Check if exists.- Specified by:
exists
in interfaceIFileSystem
- Parameters:
f
- source file- Throws:
IOException
-
delete
public abstract boolean delete(Path f, boolean recursive) throws IOException
Description copied from interface:IFileSystem
Delete a file.- Specified by:
delete
in interfaceIFileSystem
- Parameters:
f
- the path to deleterecursive
- if path is a directory and set totrue
, the directory is deleted else throws an exception. In case of a file the recursive can be set to eithertrue
orfalse
- Returns:
true
if delete is successful,false
otherwise- Throws:
IOException
-
mkdirs
public abstract boolean mkdirs(Path f) throws IOException
Description copied from interface:IFileSystem
Make the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.- Specified by:
mkdirs
in interfaceIFileSystem
- Parameters:
f
- the directory/directories to be created- Returns:
true
if at least one new directory has been created,false
otherwise- Throws:
IOException
- thrown if an I/O error occurs while creating the directory
-
create
public abstract FSDataOutputStream create(Path f, FileSystem.WriteMode overwriteMode) throws IOException
Description copied from interface:IFileSystem
Opens an FSDataOutputStream to a new file at the given path.If the file already exists, the behavior depends on the given
WriteMode
. If the mode is set toFileSystem.WriteMode.NO_OVERWRITE
, then this method fails with an exception.- Specified by:
create
in interfaceIFileSystem
- Parameters:
f
- The file path to write tooverwriteMode
- The action to take if a file or directory already exists at the given path.- Returns:
- The stream to the new file at the target path.
- Throws:
IOException
- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
rename
public abstract boolean rename(Path src, Path dst) throws IOException
Description copied from interface:IFileSystem
Renames the file/directory src to dst.- Specified by:
rename
in interfaceIFileSystem
- Parameters:
src
- the file/directory to renamedst
- the new name of the file/directory- Returns:
true
if the renaming was successful,false
otherwise- Throws:
IOException
-
isDistributedFS
public abstract boolean isDistributedFS()
Description copied from interface:IFileSystem
Returns true if this is a distributed file system. A distributed file system here means that the file system is shared among all Flink processes that participate in a cluster or job and that all these processes can see the same files.- Specified by:
isDistributedFS
in interfaceIFileSystem
- Returns:
- True, if this is a distributed file system, false otherwise.
-
initOutPathLocalFS
public boolean initOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Description copied from interface:IFileSystem
Initializes output directories on local file systems according to the given write mode.- WriteMode.NO_OVERWRITE & parallel output:
- A directory is created if the output path does not exist.
- An existing directory is reused, files contained in the directory are NOT deleted.
- An existing file raises an exception.
- WriteMode.NO_OVERWRITE & NONE parallel output:
- An existing file or directory raises an exception.
- WriteMode.OVERWRITE & parallel output:
- A directory is created if the output path does not exist.
- An existing directory is reused, files contained in the directory are NOT deleted.
- An existing file is deleted and replaced by a new directory.
- WriteMode.OVERWRITE & NONE parallel output:
- An existing file or directory (and all its content) is deleted
Files contained in an existing directory are not deleted, because multiple instances of a DataSinkTask might call this function at the same time and hence might perform concurrent delete operations on the file system (possibly deleting output files of concurrently running tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create operations would be difficult.
- Specified by:
initOutPathLocalFS
in interfaceIFileSystem
- Parameters:
outPath
- Output path that should be prepared.writeMode
- Write mode to consider.createDirectory
- True, to initialize a directory at the given path, false to prepare space for a file.- Returns:
- True, if the path was successfully prepared, false otherwise.
- Throws:
IOException
- Thrown, if any of the file system access operations failed.
- WriteMode.NO_OVERWRITE & parallel output:
-
initOutPathDistFS
public boolean initOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Description copied from interface:IFileSystem
Initializes output directories on distributed file systems according to the given write mode.WriteMode.NO_OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing file or directory raises an exception.
WriteMode.NO_OVERWRITE & NONE parallel output: - An existing file or directory raises an exception.
WriteMode.OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing directory and its content is deleted and a new directory is created. - An existing file is deleted and replaced by a new directory.
WriteMode.OVERWRITE & NONE parallel output: - An existing file or directory is deleted and replaced by a new directory.
- Specified by:
initOutPathDistFS
in interfaceIFileSystem
- Parameters:
outPath
- Output path that should be prepared.writeMode
- Write mode to consider.createDirectory
- True, to initialize a directory at the given path, false otherwise.- Returns:
- True, if the path was successfully prepared, false otherwise.
- Throws:
IOException
- Thrown, if any of the file system access operations failed.
-
-