@Public public abstract class FileSystem extends Object
Flink implements and supports some file system types directly (for example the default machine-local file system). Other file system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for example HDFS).
The purpose of this abstraction is used to expose a common and well defined interface for access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing state and recovery data) and by reusable built-in connectors (file sources / sinks).
The purpose of this abstraction is not to give user programs an abstraction with extreme flexibility and control across all possible file systems. That mission would be a folly, as the differences in characteristics of even the most common file systems are already quite large. It is expected that user programs that need specialized functionality of certain file systems in their functions, operations, sources, or sinks instantiate the specialized file system adapters directly.
The FileSystem's output streams
are used to persistently store
data, both for results of streaming applications and for fault tolerance and recovery. It is
therefore crucial that the persistence semantics of these streams are well defined.
Data written to an output stream is considered persistent, if two requirements are met:
LocalFileSystem
does not provide any durability guarantees for crashes of both
hardware and operating system, while replicated distributed file systems (like HDFS)
typically guarantee durability in the presence of at most n concurrent node
failures, where n is the replication factor.
Updates to the file's parent directory (such that the file shows up when listing the directory contents) are not required to be complete for the data in the file stream to be considered persistent. This relaxation is important for file systems where updates to directory contents are only eventually consistent.
The FSDataOutputStream
has to guarantee data persistence for the written bytes once
the call to FSDataOutputStream.close()
returns.
For fault-tolerant distributed file systems, data is considered persistent once it has been received and acknowledged by the file system, typically by having been replicated to a quorum of machines (durability requirement). In addition the absolute file path must be visible to all other machines that will potentially access the file (visibility requirement).
Whether data has hit non-volatile storage on the storage nodes depends on the specific guarantees of the particular file system.
The metadata updates to the file's parent directory are not required to have reached a consistent state. It is permissible that some machines see the file when listing the parent directory's contents while others do not, as long as access to the file by its absolute path is possible on all nodes.
A local file system must support the POSIX close-to-open semantics. Because the local file system does not have any fault tolerance guarantees, no further requirements exist.
The above implies specifically that data may still be in the OS cache when considered persistent from the local file system's perspective. Crashes that cause the OS cache to loose data are considered fatal to the local machine and are not covered by the local file system's guarantees as defined by Flink.
That means that computed results, checkpoints, and savepoints that are written only to the local filesystem are not guaranteed to be recoverable from the local machine's failure, making local file systems unsuitable for production setups.
Many file systems either do not support overwriting contents of existing files at all, or do not support consistent visibility of the updated contents in that case. For that reason, Flink's FileSystem does not support appending to existing files, or seeking within output streams so that previously written data could be overwritten.
Overwriting files is in general possible. A file is overwritten by deleting it and creating a new file. However, certain filesystems cannot make that change synchronously visible to all parties that have access to the file. For example Amazon S3 guarantees only eventual consistency in the visibility of the file replacement: Some machines may see the old file, some machines may see the new file.
To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to the same file path more than once.
Implementations of FileSystem
must be thread-safe: The same instance of FileSystem is
frequently shared across multiple threads in Flink and must be able to concurrently create
input/output streams and list file metadata.
The FSDataOutputStream
and FSDataOutputStream
implementations are strictly
not thread-safe. Instances of the streams should also not be passed between threads in
between read or write operations, because there are no guarantees about the visibility of
operations across threads (many operations do not create memory fences).
When application code obtains a FileSystem (via get(URI)
or via Path.getFileSystem()
), the FileSystem instantiates a safety net for that FileSystem. The safety
net ensures that all streams created from the FileSystem are closed when the application task
finishes (or is canceled or failed). That way, the task's threads do not leak connections.
Internal runtime code can explicitly obtain a FileSystem that does not use the safety net via
getUnguardedFileSystem(URI)
.
FSDataInputStream
,
FSDataOutputStream
Modifier and Type | Class and Description |
---|---|
static class |
FileSystem.WriteMode
The possible write modes.
|
Constructor and Description |
---|
FileSystem() |
Modifier and Type | Method and Description |
---|---|
FSDataOutputStream |
create(Path f,
boolean overwrite)
Deprecated.
Use
create(Path, WriteMode) instead. |
FSDataOutputStream |
create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize)
Deprecated.
Deprecated because not well supported across types of file systems. Control the
behavior of specific file systems via configurations instead.
|
abstract FSDataOutputStream |
create(Path f,
FileSystem.WriteMode overwriteMode)
Opens an FSDataOutputStream to a new file at the given path.
|
RecoverableWriter |
createRecoverableWriter()
Creates a new
RecoverableWriter . |
abstract boolean |
delete(Path f,
boolean recursive)
Delete a file.
|
boolean |
exists(Path f)
Check if exists.
|
static FileSystem |
get(URI uri)
Returns a reference to the
FileSystem instance for accessing the file system
identified by the given URI . |
long |
getDefaultBlockSize()
Deprecated.
This value is no longer used and is meaningless.
|
static URI |
getDefaultFsUri()
Gets the default file system URI that is used for paths and file systems that do not specify
and explicit scheme.
|
abstract BlockLocation[] |
getFileBlockLocations(FileStatus file,
long start,
long len)
Return an array containing hostnames, offset and size of portions of the given file.
|
abstract FileStatus |
getFileStatus(Path f)
Return a file status object that represents the path.
|
abstract Path |
getHomeDirectory()
Returns the path of the user's home directory in this file system.
|
abstract FileSystemKind |
getKind()
Deprecated.
this method is not used anymore.
|
static FileSystem |
getLocalFileSystem()
Returns a reference to the
FileSystem instance for accessing the local file system. |
static FileSystem |
getUnguardedFileSystem(URI fsUri) |
abstract URI |
getUri()
Returns a URI whose scheme and authority identify this file system.
|
abstract Path |
getWorkingDirectory()
Returns the path of the file system's current working directory.
|
static void |
initialize(Configuration config)
Deprecated.
use
initialize(Configuration, PluginManager) instead. |
static void |
initialize(Configuration config,
PluginManager pluginManager)
Initializes the shared file system settings.
|
boolean |
initOutPathDistFS(Path outPath,
FileSystem.WriteMode writeMode,
boolean createDirectory)
Initializes output directories on distributed file systems according to the given write mode.
|
boolean |
initOutPathLocalFS(Path outPath,
FileSystem.WriteMode writeMode,
boolean createDirectory)
Initializes output directories on local file systems according to the given write mode.
|
abstract boolean |
isDistributedFS()
Returns true if this is a distributed file system.
|
abstract FileStatus[] |
listStatus(Path f)
List the statuses of the files/directories in the given path if the path is a directory.
|
abstract boolean |
mkdirs(Path f)
Make the given file and all non-existent parents into directories.
|
abstract FSDataInputStream |
open(Path f)
Opens an FSDataInputStream at the indicated Path.
|
abstract FSDataInputStream |
open(Path f,
int bufferSize)
Opens an FSDataInputStream at the indicated Path.
|
abstract boolean |
rename(Path src,
Path dst)
Renames the file/directory src to dst.
|
@Deprecated public static void initialize(Configuration config) throws IllegalConfigurationException
initialize(Configuration, PluginManager)
instead.The given configuration is passed to each file system factory to initialize the respective file systems. Because the configuration of file systems may be different subsequent to the call of this method, this method clears the file system instance cache.
This method also reads the default file system URI from the configuration key CoreOptions.DEFAULT_FILESYSTEM_SCHEME
. All calls to get(URI)
where the
URI has no scheme will be interpreted as relative to that URI. As an example, assume the
default file system URI is set to 'hdfs://localhost:9000/'
. A file path of '/user/USERNAME/in.txt'
is interpreted as 'hdfs://localhost:9000/user/USERNAME/in.txt'
.
config
- the configuration from where to fetch the parameter.IllegalConfigurationException
public static void initialize(Configuration config, PluginManager pluginManager) throws IllegalConfigurationException
The given configuration is passed to each file system factory to initialize the respective file systems. Because the configuration of file systems may be different subsequent to the call of this method, this method clears the file system instance cache.
This method also reads the default file system URI from the configuration key CoreOptions.DEFAULT_FILESYSTEM_SCHEME
. All calls to get(URI)
where the
URI has no scheme will be interpreted as relative to that URI. As an example, assume the
default file system URI is set to 'hdfs://localhost:9000/'
. A file path of '/user/USERNAME/in.txt'
is interpreted as 'hdfs://localhost:9000/user/USERNAME/in.txt'
.
config
- the configuration from where to fetch the parameter.pluginManager
- optional plugin manager that is used to initialized filesystems provided
as plugins.IllegalConfigurationException
public static FileSystem getLocalFileSystem()
FileSystem
instance for accessing the local file system.FileSystem
instance for accessing the local file system.public static FileSystem get(URI uri) throws IOException
FileSystem
instance for accessing the file system
identified by the given URI
.uri
- the URI
identifying the file systemFileSystem
instance for accessing the file system
identified by the given URI
.IOException
- thrown if a reference to the file system instance could not be obtained@Internal public static FileSystem getUnguardedFileSystem(URI fsUri) throws IOException
IOException
public static URI getDefaultFsUri()
As an example, assume the default file system URI is set to 'hdfs://someserver:9000/'
. A file path of '/user/USERNAME/in.txt'
is interpreted as
'hdfs://someserver:9000/user/USERNAME/in.txt'
.
public abstract Path getWorkingDirectory()
public abstract Path getHomeDirectory()
public abstract URI getUri()
public abstract FileStatus getFileStatus(Path f) throws IOException
f
- The path we want information fromFileNotFoundException
- when the path does not exist; IOException see specific
implementationIOException
public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException
IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
f
- the file name to openbufferSize
- the size of the buffer to be used.IOException
public abstract FSDataInputStream open(Path f) throws IOException
f
- the file to openIOException
public RecoverableWriter createRecoverableWriter() throws IOException
RecoverableWriter
. A recoverable writer creates streams that can
persist and recover their intermediate state. Persisting and recovering intermediate state is
a core building block for writing to files that span multiple checkpoints.
The returned object can act as a shared factory to open and recover multiple streams.
This method is optional on file systems and various file system implementations may not
support this method, throwing an UnsupportedOperationException
.
IOException
- Thrown, if the recoverable writer cannot be instantiated.@Deprecated public long getDefaultBlockSize()
public abstract FileStatus[] listStatus(Path f) throws IOException
f
- given pathIOException
public boolean exists(Path f) throws IOException
f
- source fileIOException
public abstract boolean delete(Path f, boolean recursive) throws IOException
f
- the path to deleterecursive
- if path is a directory and set to true
, the directory is
deleted else throws an exception. In case of a file the recursive can be set to either
true
or false
true
if delete is successful, false
otherwiseIOException
public abstract boolean mkdirs(Path f) throws IOException
f
- the directory/directories to be createdtrue
if at least one new directory has been created, false
otherwiseIOException
- thrown if an I/O error occurs while creating the directory@Deprecated public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException
This method is deprecated, because most of its parameters are ignored by most file systems. To control for example the replication factor and block size in the Hadoop Distributed File system, make sure that the respective Hadoop configuration file is either linked from the Flink configuration, or in the classpath of either Flink or the user code.
f
- the file name to openoverwrite
- if a file with this name already exists, then if true, the file will be
overwritten, and if false an error will be thrown.bufferSize
- the size of the buffer to be used.replication
- required block replication for the file.blockSize
- the size of the file blocksIOException
- Thrown, if the stream could not be opened because of an I/O, or because a
file already exists at that path and the write mode indicates to not overwrite the file.@Deprecated public FSDataOutputStream create(Path f, boolean overwrite) throws IOException
create(Path, WriteMode)
instead.f
- the file name to openoverwrite
- if a file with this name already exists, then if true, the file will be
overwritten, and if false an error will be thrown.IOException
- Thrown, if the stream could not be opened because of an I/O, or because a
file already exists at that path and the write mode indicates to not overwrite the file.public abstract FSDataOutputStream create(Path f, FileSystem.WriteMode overwriteMode) throws IOException
If the file already exists, the behavior depends on the given WriteMode
. If the
mode is set to FileSystem.WriteMode.NO_OVERWRITE
, then this method fails with an exception.
f
- The file path to write tooverwriteMode
- The action to take if a file or directory already exists at the given
path.IOException
- Thrown, if the stream could not be opened because of an I/O, or because a
file already exists at that path and the write mode indicates to not overwrite the file.public abstract boolean rename(Path src, Path dst) throws IOException
src
- the file/directory to renamedst
- the new name of the file/directorytrue
if the renaming was successful, false
otherwiseIOException
public abstract boolean isDistributedFS()
@Deprecated public abstract FileSystemKind getKind()
public boolean initOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Files contained in an existing directory are not deleted, because multiple instances of a DataSinkTask might call this function at the same time and hence might perform concurrent delete operations on the file system (possibly deleting output files of concurrently running tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create operations would be difficult.
outPath
- Output path that should be prepared.writeMode
- Write mode to consider.createDirectory
- True, to initialize a directory at the given path, false to prepare
space for a file.IOException
- Thrown, if any of the file system access operations failed.public boolean initOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
WriteMode.NO_OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing file or directory raises an exception.
WriteMode.NO_OVERWRITE & NONE parallel output: - An existing file or directory raises an exception.
WriteMode.OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing directory and its content is deleted and a new directory is created. - An existing file is deleted and replaced by a new directory.
WriteMode.OVERWRITE & NONE parallel output: - An existing file or directory is deleted and replaced by a new directory.
outPath
- Output path that should be prepared.writeMode
- Write mode to consider.createDirectory
- True, to initialize a directory at the given path, false otherwise.IOException
- Thrown, if any of the file system access operations failed.Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.