Class FileSystem

  • All Implemented Interfaces:
    IFileSystem
    Direct Known Subclasses:
    ForStFlinkFileSystem, HadoopFileSystem, LimitedConnectionsFileSystem, LocalFileSystem, SafetyNetWrapperFileSystem

    @Public
    public abstract class FileSystem
    extends Object
    implements IFileSystem
    Abstract base class of all file systems used by Flink. This class may be extended to implement distributed file systems, or local file systems. The abstraction by this file system is very simple, and the set of available operations quite limited, to support the common denominator of a wide range of file systems. For example, appending to or mutating existing files is not supported.

    Flink implements and supports some file system types directly (for example the default machine-local file system). Other file system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for example HDFS).

    Scope and Purpose

    The purpose of this abstraction is used to expose a common and well defined interface for access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing state and recovery data) and by reusable built-in connectors (file sources / sinks).

    The purpose of this abstraction is not to give user programs an abstraction with extreme flexibility and control across all possible file systems. That mission would be a folly, as the differences in characteristics of even the most common file systems are already quite large. It is expected that user programs that need specialized functionality of certain file systems in their functions, operations, sources, or sinks instantiate the specialized file system adapters directly.

    Data Persistence Contract

    The FileSystem's output streams are used to persistently store data, both for results of streaming applications and for fault tolerance and recovery. It is therefore crucial that the persistence semantics of these streams are well defined.

    Definition of Persistence Guarantees

    Data written to an output stream is considered persistent, if two requirements are met:

    1. Visibility Requirement: It must be guaranteed that all other processes, machines, virtual machines, containers, etc. that are able to access the file see the data consistently when given the absolute file path. This requirement is similar to the close-to-open semantics defined by POSIX, but restricted to the file itself (by its absolute path).
    2. Durability Requirement: The file system's specific durability/persistence requirements must be met. These are specific to the particular file system. For example the LocalFileSystem does not provide any durability guarantees for crashes of both hardware and operating system, while replicated distributed file systems (like HDFS) typically guarantee durability in the presence of at most n concurrent node failures, where n is the replication factor.

    Updates to the file's parent directory (such that the file shows up when listing the directory contents) are not required to be complete for the data in the file stream to be considered persistent. This relaxation is important for file systems where updates to directory contents are only eventually consistent.

    The FSDataOutputStream has to guarantee data persistence for the written bytes once the call to FSDataOutputStream.close() returns.

    Examples

    Fault-tolerant distributed file systems

    For fault-tolerant distributed file systems, data is considered persistent once it has been received and acknowledged by the file system, typically by having been replicated to a quorum of machines (durability requirement). In addition the absolute file path must be visible to all other machines that will potentially access the file (visibility requirement).

    Whether data has hit non-volatile storage on the storage nodes depends on the specific guarantees of the particular file system.

    The metadata updates to the file's parent directory are not required to have reached a consistent state. It is permissible that some machines see the file when listing the parent directory's contents while others do not, as long as access to the file by its absolute path is possible on all nodes.

    Local file systems

    A local file system must support the POSIX close-to-open semantics. Because the local file system does not have any fault tolerance guarantees, no further requirements exist.

    The above implies specifically that data may still be in the OS cache when considered persistent from the local file system's perspective. Crashes that cause the OS cache to lose data are considered fatal to the local machine and are not covered by the local file system's guarantees as defined by Flink.

    That means that computed results, checkpoints, and savepoints that are written only to the local filesystem are not guaranteed to be recoverable from the local machine's failure, making local file systems unsuitable for production setups.

    Updating File Contents

    Many file systems either do not support overwriting contents of existing files at all, or do not support consistent visibility of the updated contents in that case. For that reason, Flink's FileSystem does not support appending to existing files, or seeking within output streams so that previously written data could be overwritten.

    Overwriting Files

    Overwriting files is in general possible. A file is overwritten by deleting it and creating a new file. However, certain filesystems cannot make that change synchronously visible to all parties that have access to the file. For example Amazon S3 guarantees only eventual consistency in the visibility of the file replacement: Some machines may see the old file, some machines may see the new file.

    To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to the same file path more than once.

    Thread Safety

    Implementations of FileSystem must be thread-safe: The same instance of FileSystem is frequently shared across multiple threads in Flink and must be able to concurrently create input/output streams and list file metadata.

    The FSDataInputStream and FSDataOutputStream implementations are strictly not thread-safe. Instances of the streams should also not be passed between threads in between read or write operations, because there are no guarantees about the visibility of operations across threads (many operations do not create memory fences).

    Streams Safety Net

    When application code obtains a FileSystem (via get(URI) or via Path.getFileSystem()), the FileSystem instantiates a safety net for that FileSystem. The safety net ensures that all streams created from the FileSystem are closed when the application task finishes (or is canceled or failed). That way, the task's threads do not leak connections.

    Internal runtime code can explicitly obtain a FileSystem that does not use the safety net via getUnguardedFileSystem(URI).

    See Also:
    FSDataInputStream, FSDataOutputStream
    • Constructor Detail

      • FileSystem

        public FileSystem()
    • Method Detail

      • initialize

        @Deprecated
        public static void initialize​(Configuration config)
                               throws IllegalConfigurationException
        Initializes the shared file system settings.

        The given configuration is passed to each file system factory to initialize the respective file systems. Because the configuration of file systems may be different subsequent to the call of this method, this method clears the file system instance cache.

        This method also reads the default file system URI from the configuration key CoreOptions.DEFAULT_FILESYSTEM_SCHEME. All calls to get(URI) where the URI has no scheme will be interpreted as relative to that URI. As an example, assume the default file system URI is set to 'hdfs://localhost:9000/'. A file path of '/user/USERNAME/in.txt' is interpreted as 'hdfs://localhost:9000/user/USERNAME/in.txt'.

        Parameters:
        config - the configuration from where to fetch the parameter.
        Throws:
        IllegalConfigurationException
      • initialize

        public static void initialize​(Configuration config,
                                      @Nullable
                                      PluginManager pluginManager)
                               throws IllegalConfigurationException
        Initializes the shared file system settings.

        The given configuration is passed to each file system factory to initialize the respective file systems. Because the configuration of file systems may be different subsequent to the call of this method, this method clears the file system instance cache.

        This method also reads the default file system URI from the configuration key CoreOptions.DEFAULT_FILESYSTEM_SCHEME. All calls to get(URI) where the URI has no scheme will be interpreted as relative to that URI. As an example, assume the default file system URI is set to 'hdfs://localhost:9000/'. A file path of '/user/USERNAME/in.txt' is interpreted as 'hdfs://localhost:9000/user/USERNAME/in.txt'.

        Parameters:
        config - the configuration from where to fetch the parameter.
        pluginManager - optional plugin manager that is used to initialized filesystems provided as plugins.
        Throws:
        IllegalConfigurationException
      • getLocalFileSystem

        public static FileSystem getLocalFileSystem()
        Returns a reference to the FileSystem instance for accessing the local file system.
        Returns:
        a reference to the FileSystem instance for accessing the local file system.
      • get

        public static FileSystem get​(URI uri)
                              throws IOException
        Returns a reference to the FileSystem instance for accessing the file system identified by the given URI.
        Parameters:
        uri - the URI identifying the file system
        Returns:
        a reference to the FileSystem instance for accessing the file system identified by the given URI.
        Throws:
        IOException - thrown if a reference to the file system instance could not be obtained
      • getDefaultFsUri

        public static URI getDefaultFsUri()
        Gets the default file system URI that is used for paths and file systems that do not specify and explicit scheme.

        As an example, assume the default file system URI is set to 'hdfs://someserver:9000/'. A file path of '/user/USERNAME/in.txt' is interpreted as 'hdfs://someserver:9000/user/USERNAME/in.txt'.

        Returns:
        The default file system URI
      • create

        @Deprecated
        public FSDataOutputStream create​(Path f,
                                         boolean overwrite,
                                         int bufferSize,
                                         short replication,
                                         long blockSize)
                                  throws IOException
        Deprecated.
        Deprecated because not well supported across types of file systems. Control the behavior of specific file systems via configurations instead.
        Opens an FSDataOutputStream at the indicated Path.

        This method is deprecated, because most of its parameters are ignored by most file systems. To control for example the replication factor and block size in the Hadoop Distributed File system, make sure that the respective Hadoop configuration file is either linked from the Flink configuration, or in the classpath of either Flink or the user code.

        Parameters:
        f - the file name to open
        overwrite - if a file with this name already exists, then if true, the file will be overwritten, and if false an error will be thrown.
        bufferSize - the size of the buffer to be used.
        replication - required block replication for the file.
        blockSize - the size of the file blocks
        Throws:
        IOException - Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
      • create

        @Deprecated
        public FSDataOutputStream create​(Path f,
                                         boolean overwrite)
                                  throws IOException
        Deprecated.
        Opens an FSDataOutputStream at the indicated Path.
        Parameters:
        f - the file name to open
        overwrite - if a file with this name already exists, then if true, the file will be overwritten, and if false an error will be thrown.
        Throws:
        IOException - Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
      • getDefaultBlockSize

        @Deprecated
        public long getDefaultBlockSize()
        Deprecated.
        This value is no longer used and is meaningless.
        Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
        Returns:
        the number of bytes that large input files should be optimally be split into to minimize I/O time
      • getWorkingDirectory

        public abstract Path getWorkingDirectory()
        Description copied from interface: IFileSystem
        Returns the path of the file system's current working directory.
        Specified by:
        getWorkingDirectory in interface IFileSystem
        Returns:
        the path of the file system's current working directory
      • getHomeDirectory

        public abstract Path getHomeDirectory()
        Description copied from interface: IFileSystem
        Returns the path of the user's home directory in this file system.
        Specified by:
        getHomeDirectory in interface IFileSystem
        Returns:
        the path of the user's home directory in this file system.
      • getUri

        public abstract URI getUri()
        Description copied from interface: IFileSystem
        Returns a URI whose scheme and authority identify this file system.
        Specified by:
        getUri in interface IFileSystem
        Returns:
        a URI whose scheme and authority identify this file system
      • getFileBlockLocations

        public abstract BlockLocation[] getFileBlockLocations​(FileStatus file,
                                                              long start,
                                                              long len)
                                                       throws IOException
        Description copied from interface: IFileSystem
        Return an array containing hostnames, offset and size of portions of the given file. For a nonexistent file or regions, null will be returned. This call is most helpful with DFS, where it returns hostnames of machines that contain the given file. The FileSystem will simply return an elt containing 'localhost'.
        Specified by:
        getFileBlockLocations in interface IFileSystem
        Throws:
        IOException
      • open

        public abstract FSDataInputStream open​(Path f,
                                               int bufferSize)
                                        throws IOException
        Description copied from interface: IFileSystem
        Opens an FSDataInputStream at the indicated Path.
        Specified by:
        open in interface IFileSystem
        Parameters:
        f - the file name to open
        bufferSize - the size of the buffer to be used.
        Throws:
        IOException
      • createRecoverableWriter

        public RecoverableWriter createRecoverableWriter()
                                                  throws IOException
        Description copied from interface: IFileSystem
        Creates a new RecoverableWriter. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.

        The returned object can act as a shared factory to open and recover multiple streams.

        This method is optional on file systems and various file system implementations may not support this method, throwing an UnsupportedOperationException.

        Specified by:
        createRecoverableWriter in interface IFileSystem
        Returns:
        A RecoverableWriter for this file system.
        Throws:
        IOException - Thrown, if the recoverable writer cannot be instantiated.
      • createRecoverableWriter

        @PublicEvolving
        public RecoverableWriter createRecoverableWriter​(Map<String,​String> conf)
                                                  throws IOException
        Description copied from interface: IFileSystem
        Creates a new RecoverableWriter. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.

        The returned object can act as a shared factory to open and recover multiple streams.

        This method is optional on file systems and various file system implementations may not support this method, throwing an UnsupportedOperationException.

        Specified by:
        createRecoverableWriter in interface IFileSystem
        Parameters:
        conf - Map contains a flag to indicate whether the writer should not write to local storage. and can provide more information to instantiate the writer.
        Returns:
        A RecoverableWriter for this file system.
        Throws:
        IOException - Thrown, if the recoverable writer cannot be instantiated.
      • listStatus

        public abstract FileStatus[] listStatus​(Path f)
                                         throws IOException
        Description copied from interface: IFileSystem
        List the statuses of the files/directories in the given path if the path is a directory.
        Specified by:
        listStatus in interface IFileSystem
        Parameters:
        f - given path
        Returns:
        the statuses of the files/directories in the given path
        Throws:
        IOException
      • delete

        public abstract boolean delete​(Path f,
                                       boolean recursive)
                                throws IOException
        Description copied from interface: IFileSystem
        Delete a file.
        Specified by:
        delete in interface IFileSystem
        Parameters:
        f - the path to delete
        recursive - if path is a directory and set to true, the directory is deleted else throws an exception. In case of a file the recursive can be set to either true or false
        Returns:
        true if delete is successful, false otherwise
        Throws:
        IOException
      • mkdirs

        public abstract boolean mkdirs​(Path f)
                                throws IOException
        Description copied from interface: IFileSystem
        Make the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.
        Specified by:
        mkdirs in interface IFileSystem
        Parameters:
        f - the directory/directories to be created
        Returns:
        true if at least one new directory has been created, false otherwise
        Throws:
        IOException - thrown if an I/O error occurs while creating the directory
      • create

        public abstract FSDataOutputStream create​(Path f,
                                                  FileSystem.WriteMode overwriteMode)
                                           throws IOException
        Description copied from interface: IFileSystem
        Opens an FSDataOutputStream to a new file at the given path.

        If the file already exists, the behavior depends on the given WriteMode. If the mode is set to FileSystem.WriteMode.NO_OVERWRITE, then this method fails with an exception.

        Specified by:
        create in interface IFileSystem
        Parameters:
        f - The file path to write to
        overwriteMode - The action to take if a file or directory already exists at the given path.
        Returns:
        The stream to the new file at the target path.
        Throws:
        IOException - Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
      • rename

        public abstract boolean rename​(Path src,
                                       Path dst)
                                throws IOException
        Description copied from interface: IFileSystem
        Renames the file/directory src to dst.
        Specified by:
        rename in interface IFileSystem
        Parameters:
        src - the file/directory to rename
        dst - the new name of the file/directory
        Returns:
        true if the renaming was successful, false otherwise
        Throws:
        IOException
      • isDistributedFS

        public abstract boolean isDistributedFS()
        Description copied from interface: IFileSystem
        Returns true if this is a distributed file system. A distributed file system here means that the file system is shared among all Flink processes that participate in a cluster or job and that all these processes can see the same files.
        Specified by:
        isDistributedFS in interface IFileSystem
        Returns:
        True, if this is a distributed file system, false otherwise.
      • initOutPathLocalFS

        public boolean initOutPathLocalFS​(Path outPath,
                                          FileSystem.WriteMode writeMode,
                                          boolean createDirectory)
                                   throws IOException
        Description copied from interface: IFileSystem
        Initializes output directories on local file systems according to the given write mode.
        • WriteMode.NO_OVERWRITE & parallel output:
          • A directory is created if the output path does not exist.
          • An existing directory is reused, files contained in the directory are NOT deleted.
          • An existing file raises an exception.
        • WriteMode.NO_OVERWRITE & NONE parallel output:
          • An existing file or directory raises an exception.
        • WriteMode.OVERWRITE & parallel output:
          • A directory is created if the output path does not exist.
          • An existing directory is reused, files contained in the directory are NOT deleted.
          • An existing file is deleted and replaced by a new directory.
        • WriteMode.OVERWRITE & NONE parallel output:
          • An existing file or directory (and all its content) is deleted

        Files contained in an existing directory are not deleted, because multiple instances of a DataSinkTask might call this function at the same time and hence might perform concurrent delete operations on the file system (possibly deleting output files of concurrently running tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create operations would be difficult.

        Specified by:
        initOutPathLocalFS in interface IFileSystem
        Parameters:
        outPath - Output path that should be prepared.
        writeMode - Write mode to consider.
        createDirectory - True, to initialize a directory at the given path, false to prepare space for a file.
        Returns:
        True, if the path was successfully prepared, false otherwise.
        Throws:
        IOException - Thrown, if any of the file system access operations failed.
      • initOutPathDistFS

        public boolean initOutPathDistFS​(Path outPath,
                                         FileSystem.WriteMode writeMode,
                                         boolean createDirectory)
                                  throws IOException
        Description copied from interface: IFileSystem
        Initializes output directories on distributed file systems according to the given write mode.

        WriteMode.NO_OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing file or directory raises an exception.

        WriteMode.NO_OVERWRITE & NONE parallel output: - An existing file or directory raises an exception.

        WriteMode.OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing directory and its content is deleted and a new directory is created. - An existing file is deleted and replaced by a new directory.

        WriteMode.OVERWRITE & NONE parallel output: - An existing file or directory is deleted and replaced by a new directory.

        Specified by:
        initOutPathDistFS in interface IFileSystem
        Parameters:
        outPath - Output path that should be prepared.
        writeMode - Write mode to consider.
        createDirectory - True, to initialize a directory at the given path, false otherwise.
        Returns:
        True, if the path was successfully prepared, false otherwise.
        Throws:
        IOException - Thrown, if any of the file system access operations failed.