Source code for dataset_loading.core

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import queue
import threading
import random
from PIL import Image
import time
import os
import warnings

__all__ = ['ImgQueue', 'FileQueue', 'ImgLoader', 'FileQueueNotStarted',
           'FileQueueDepleted', 'ImgQueueNotStarted']

EPOCHS_TO_PUT = 10
FILEQUEUE_SLEEPTIME = 5
FILEQUEUE_BLOCKTIME = 1
IMGQUEUE_BLOCKTIME = 3


[docs]class ImgQueue(queue.Queue): """A queue to hold images This queue can hold images which will be loaded from the main program. Multiple file reader threads can fill up this queue as needed to make sure demand is met. Each entry in the image queue will then be either tuple of (data, label). If the data is loaded using a filename queue and image loader threads and a label is not provided, each queue item will still be a tuple, only the label will be None. If you don't want to return this label, then you can set the nolabel input to the start_loaders function. To get a batch of samples from the ImageQueue, see the :py:meth:`get_batch` method. If you are lucky enough to have an entire dataset that fits easily into memory, you won't need to use multiple threads to start loading data. You may however want to keep the same interface. In this case, you can call the take_dataset function with the dataset and labels, and then call the :py:meth:`get_batch` method in the same manner. Parameters ---------- maxsize : positive int Maximum number of images to hold in the queue. Needs to not be 0 or else it will keep filling up until you run out of memory. name : str Queue name Raises ------ ValueError if the maxsize parameter is incorrect. """ def __init__(self, maxsize=1000, name=''): if maxsize <= 0: raise ValueError('The Image Queue needs to have a maximum ' + 'size or you may run out of memory.') queue.Queue.__init__(self, maxsize=maxsize) self._epoch_size = None self._read_count = 0 self._epoch_count = 0 self._last_batch = False self.in_memory = False self.name = name self.logging_on = False self._kill = False self.file_queue = None self.loaders_alive = [] self.loaders_started = False def __repr__(self): def bool2str(x): if x: return "yes" else: return "no" return ("ImgQueue instance - {}.\n".format(self.name) + "Loaders started: {}\n".format(bool2str(self.loaders_started)) + "Loaders done: {}\n".format(bool2str(self.loaders_finished)) + "Dataset in mem: {}\n".format(bool2str(self.in_memory)) + "Read count: {}\n".format(self.read_count) + "Epoch size: {}\n".format(self.epoch_size)) @property def last_batch(self): """ Check whether the previously read batch was the last batch in the epoch. **Reading this value will set it to False**. This allows you to do something like this:: while True: while not train_queue.last_batch: data, labels = train_queue.get_batch(batch_size) ... """ test = self._last_batch if test: self._last_batch = False return test @property def epoch_size(self): """ The epoch size (as interpreted from the File Queue) """ if self.file_queue is not None: return self.file_queue.epoch_size else: return -1 @property def read_count(self): """ Returns how many images have been read from this queue. """ return self._read_count @property def epoch_count(self): """ Returns what epoch we are currently at """ return self._epoch_count @property def img_shape(self): """ Return what the image size is of the images in the queue This may be useful to check the output shape after any preprocessing has been done. Returns ------- img_size : list of ints or None Returns the shape of the images in the queue or None if it could not determine what they were. """ try: return self.queue[0][0].shape except: return None @property def killed(self): """ Returns True if the queue has been asked to die. """ return self._kill @property def label_shape(self): """ Return what the label shape is of the labels in the queue This may be useful to check the output shape after any preprocessing has been done. Returns ------- label_shape : list of ints or None Returns the shape of the images in the queue or None if it could not determine what they were. """ try: return self.queue[0][1].shape except: return None @property def filling(self): """ Returns true if the file queue is being filled """ return (True in self.loaders_alive) @property def loaders_finished(self): return self.loaders_started and not self.filling
[docs] def join(self): """ Method to signal any threads that are filling this queue to stop. Threads will clean themselves up if the epoch limit is reached, but in case you want to kill them manually before that, you can signal them to stop here. Note that if these threads are blocked waiting on input, they will still stay alive (and blocked) until whatever is blocking them frees up. This shouldn't be a problem though, as they will not be taking up any processing power. If there is a file queue associated with this image queue, those threads will be stopped too. Note: Overloads the queue join method which normally blocks until the queue has been emptied. This will return even if the queue has data in it. """ # Kill the file queue if self.file_queue is not None: self.file_queue.join() # Tell the loaders to stop filling the queue self._kill = True # Empty the queue once through - note that the loaders may still fill it # up afterwards, but we want to just stop them blocking trying to put # into a full queue while(self.filling): try: self.get_nowait() except queue.Empty: time.sleep(0.01) for l in self.loaders: l.join()
[docs] def start_loaders(self, file_queue, num_threads=3, img_dir=None, img_size=None, transform=None): """Starts the threads to load the images into the ImageQueue Parameters ---------- file_queue : FileQueue object An instance of the file queue num_threads : int How many parallel threads to start to load the images img_dir : str Offset to add to the strings fetched from the file queue so that a call to load the file in will succeed. img_size : tuple of (height, width) or None What size to resize all the images to. If None, no resizing will be done. transform : function handle or None Pre-filtering operation to apply to the images before adding to the Image Queue. If None, no operation will be applied. Otherwise, has to be a function handle that takes the numpy array and returns the transformed image as a numpy array. Raises ------ ValueError: if called after take_dataset. """ if self.in_memory: raise ValueError( "You have already called take_dataset for this Image Queue, " + "which loaded the images into memory. You cannot start " + "threads to load from a file queue afterwards.") self.file_queue = file_queue self.loaders_alive = [True,] * num_threads loaders = [ ImgLoader(i, file_queue, self, img_dir=img_dir, img_size=img_size, transform=transform) for i in range(num_threads) ] [loader.start() for loader in loaders] self.loaders_started = True self.loaders = loaders
[docs] def take_dataset(self, data, labels=None, shuffle=True, num_threads=1, transform=None, max_epochs=float('inf')): """Save the image dataset to the class for feeding back later. If we don't need a file queue (we have all the dataset in memory), we can give it to the ImgQueue class with this method. Images will still flow through the queue (so you still need to be careful about how big to set the queue's maxsize), but now the preprocessed images will be fed into the queue, ready to retrieve quickly by the main program. Parameters ---------- data : ndarray of floats The images. Should be in the form your main program is happy to receive them in, as no reshaping will be done. For example, if the data is of shape [10000, 32, 32, 3], then we randomly sample from the zeroth axis when we call get batch. labels : ndarray numeric or None The labels. If not None, the zeroth axis has to match the size of the data array. If None, then no labels will be returned when calling get batch. shuffle : bool Normally the ordering will be done in the file queue, as we are skipping this, the ordering has to be done here. Set this to true if you want to receive samples randomly from data. num_threads : int How many threads to start to fill up the image queue with the preprocessed data. transform : None or callable Transform to apply to images. Should accept a single image (although isn't fussy about what size/shape it is in), and return a single image. This will be applied to all the images independently before putting them in the Image Queue. Notes ----- Even if shuffle input is set to false, that doesn't necessarily mean that all images in the image queue will be in the same order across epochs. For example, if thread A pulls the first 100 images from the list and then thread B gets the second 100. Thread A takes slightly longer to process the images than thread B, so these get inserted into the Image Queue afterwards. Trying to synchronize across both queues could be done, but it would add unnecessary complications and overhead. Raises ------ AssertionError if data and labels don't match up in size. """ # Create a file queue. This will only contain indices into the numpy # arrays data and labels. self.file_queue = FileQueue() files = list(range(data.shape[0])) self.file_queue.load_epochs(files, shuffle=shuffle, max_epochs=max_epochs) loaders = [] # self.loaders_alive = self.mngr.list([True,]*num_threads) self.loaders_alive = [True,] * num_threads for i in range(num_threads): thread = threading.Thread( target=_mini_loader, name='Mini Loader Thread', kwargs={'idx': i, 'fq': self.file_queue, 'iq': self, 'data': data, 'labels': labels, 'transform': transform}, daemon=True) thread.start() loaders.append(thread) self.loaders_started = True self.loaders = loaders
[docs] def get(self, block=True, timeout=None): """ Get a single item from the Image Queue""" if not self.loaders_started: raise ImgQueueNotStarted( "Start the Image Queue Loaders by calling start_loaders " + "before calling get") else: data = super(ImgQueue, self).get(block, timeout) self._read_count += 1 return data
[docs] def get_batch(self, batch_size, timeout=IMGQUEUE_BLOCKTIME): """Tries to get a batch from the Queue. If there is less than a batch of images, it will grab them all. If the epoch size was set and the tracking counter sees there are fewer than <batch_size> images until we hit an epoch, then it will cap the amount of images grabbed to reach the epoch. Parameters ---------- batch_size : int How many samples we want to get. timeout : bool How long to wait on timeout Returns ------- data : list of ndarray List of numpy arrays representing the transformed images. labels : list of ndarray or None List of labels. Will be None if there were no labels in the FileQueue. Notes ----- When we pull the last batch from the image queue, the property last_batch is set to true. This allows the calling function to synchronize tests with the end of an epoch. Raises ------ FileQueueNotStarted - when trying to get a batch but the file queue manager hasn't started. FileQueueDepleted - when we have hit the epoch limit. ImgQueueNotStarted - when trying to get a batch but no image loaders have started. queue.Empty - If timed out on trying to read an image """ if not self.loaders_started: raise ImgQueueNotStarted( "Start the Image Queue Loaders by calling start_loaders " + "before calling get") else: return self._get_batch(batch_size, timeout)
def _get_batch(self, batch_size, timeout=None): # Determine some limits on how many images to grab. rem = batch_size if self.epoch_size is not None: rem = self.epoch_size - (self._read_count % self.epoch_size) nsamples = min(rem, batch_size) start = time.time() data = [] for i in range(nsamples): try: item = self.get(block=True, timeout=timeout) except queue.Empty: time.sleep(FILEQUEUE_BLOCKTIME) if self.loaders_finished: raise FileQueueDepleted("No more images left") else: # Allow some time for the file queues to definitely finish # before raising an exception raise ValueError( 'Queue Empty Exception but File Queue is still' + 'active. Maybe the image loaders are under heavy ' + 'load?') data.append(item) end = time.time() if self.epoch_size is not None: if self._read_count // self.epoch_size > self._epoch_count: self._last_batch = True self._epoch_count = self._read_count // self.epoch_size if self.logging_on: self._update_logger_info(end-start) # Unzip the data and labels before returning # data, labels = zip(*data) labels = [x[1] for x in data] data = [x[0] for x in data] if labels[0] is None: return data, None else: return data, labels
[docs] def add_logging(self, writer, write_period=10): """ Adds ability to monitor queue sizes and fetch times. Will try to import tensorflow and throw a warnings.warn if it couldn't. Parameters ---------- file_writer : tensorflow FileWriter object Uses this object to write out summaries. write_period : int After how many calls to get_batch should we write to the logger. """ try: from dataset_loading.tensorboard_logging import Logger except ImportError: warnings.warn('Sorry, I couldnt import the necessary modules') return self.logger = Logger(writer=writer) self.write_period = write_period self.call_count = 0 self.logging_on = True self.logger_info = { 'call_count': 0, 'av_qsize': 0, 'av_fetch_time': 0, 'epoch_idx': 0, 'epoch_size': 0, 'epoch_qsize': np.zeros((10000,)), 'epoch_fetch_time': np.zeros((10000,)) }
def _update_logger_info(self, fetch_time): idx = self.logger_info['call_count'] idx += 1 self.logger_info['call_count'] = idx # The summary scalars self.logger_info['av_qsize'] += self.qsize() self.logger_info['av_fetch_time'] += fetch_time if idx % self.write_period == 0: qsize = self.logger_info['av_qsize'] / self.write_period fetch_times = self.logger_info['av_fetch_time'] / self.write_period self.logger.log_scalar('queues/{}/fetch_time'.format(self.name), fetch_times, idx) self.logger.log_scalar('queues/{}/qsize'.format(self.name), qsize, idx) self.logger_info['av_qsize'] = 0 self.logger_info['av_fetch_time'] = 0 # The summary histograms # If we fill up the buffer, wrap around to start and write at beginning i = self.logger_info['epoch_idx'] size = self.logger_info['epoch_size'] i += 1 size += 1 if i % self.logger_info['epoch_qsize'].shape[0] == 0: i = 0 size -= 1 self.logger_info['epoch_qsize'][i] = self.qsize() self.logger_info['epoch_fetch_time'][i] = fetch_time if self._last_batch: self.logger.log_histogram( 'queues/{}/fetch_time'.format(self.name), self.logger_info['epoch_fetch_time'][:size], idx, bins=1000) self.logger.log_histogram( 'queues/{}/qsize'.format(self.name), self.logger_info['epoch_qsize'][:size], idx, bins=1000) i = 0 size = 0 self.logger_info['epoch_idx'] = i self.logger_info['epoch_size'] = size
def _mini_loader(idx, fq, iq, data, labels, transform): """ Image Queue manager for when we have a dataset provided If the dataset is small enough to fit into memory and we don't need file queues, we can use an image queue and a simpler multithreaded function to load it. In this case, we make a mock file queue, which is just filled with indices into the np array of images. """ # print("Starting miniloader thread {}".format(idx)) if labels is not None: assert data.shape[0] == labels.shape[0] while not iq.killed: # Try get an item # print('reading fq') have_data = False try: # item = self.file_queue.get_nowait() item = fq.get(block=True, timeout=FILEQUEUE_BLOCKTIME) have_data = True except queue.Empty: # If the file queue ran out, exit quietly if not fq.filling: break except OSError: # Perhaps the file queue has been shut? break if have_data: # print('inserting into iq') try: assert not isinstance(item, tuple) except AssertionError: item = item[0] # 'Load' the image and label - reshape if necessary img = data[item] if transform is not None: img = transform(img) if labels is not None: label = labels[item] else: label = None # Put it into my queue. iq.put((img, label)) # print("Ending processing thread {}".format(idx)) # iq.cancel_join_thread() iq.loaders_alive[idx] = 0
[docs]class FileQueue(queue.Queue): """A queue to hold filename strings This queue is used to indicate what order of jpeg files should be read. It may also be a good idea to put the class label alongside the filename as a tuple, so the main program can get access to both of these at the same time. Create the class, and then call the load_epochs() method to start a thread to manage the queue and refill it as it gets low. The maxsize is not provided as an option as we want the queue to be able to take entire epochs and not be restricted on the upper limit by a maxsize. The data should be no problem as the queue entries are only integers. """ def __init__(self, maxsize=0): queue.Queue.__init__(self, maxsize=maxsize) self._epoch_size = -1 self.thread = None self.started = False # Flags for the ImgQueue self.started = False self._kill = False self.loader_alive = False self._epoch_count = -1 @property def epoch_count(self): """ The current epoch count """ return self._epoch_count @epoch_count.setter def epoch_count(self, val): """ Update the epoch count """ self._epoch_count = val @property def killed(self): """ Returns true if the queue has been asked to die """ return self._kill @property def filling(self): """ Returns true if the file queue is being filled """ return self.loader_alive @property def epoch_size(self): """ Gives the size of one epoch of data """ return self._epoch_size
[docs] def get(self, block=True, timeout=None): """ Get a single item from the Image Queue""" if not self.started: raise FileQueueNotStarted( "Start the File Queue manager by calling " + "FileQueue.load_epochs before calling get_batch") else: return super(FileQueue, self).get(block, timeout)
def _depleted(self): raise FileQueueDepleted('End of Training samples')
[docs] def join(self): """ Method to signal any threads that are filling this queue to stop. Threads will clean themselves up if the epoch limit is reached, but in case you want to kill them manually before that, you can signal them to stop here. Note: Overloads the queue join method which normally blocks until the queue has been emptied. This will return even if the queue has data in it. """ # Tell the loaders to stop filling the queue self._kill = True time.sleep(1) # Ensure the file loaders aren't blocking by eating up the remaining # data while(self.loader_alive): try: self.get_nowait() except queue.Empty: time.sleep(0.01) self.thread.join()
[docs] def load_epochs(self, files, shuffle=True, max_epochs=float('inf')): """ Starts a thread to load the file names into the file queue. Parameters ---------- files : list Can either be a list of filename strings or a list of tuples of (filenames, labels) shuffle : bool Whether to shuffle the list before adding it to the queue. max_epochs : int or infinity Maximum number of epochs to allow before queue manager stops refilling the queue. Notes ----- Even if shuffle input is set to false, that doesn't necessarily mean that all images in the image queue will be in the same order across epochs. For example, if thread A pulls the first image from the list and then thread B gets the second 1. Thread A takes slightly longer to read in the image than thread B, so it gets inserted into the Image Queue afterwards. Trying to synchronize across both queues could be done, but it would add unnecessary complications and overhead. Raises ------ ValueError - If the files queue was empty """ if len(files) == 0: raise ValueError('The files list cannot be empty') # Limit ourselves to only one thread for the file queue if self.thread is None: myfiles = files[:] self.thread = threading.Thread( target=file_loader, name='File Queue Thread', kwargs={'files': myfiles, 'fq': self, 'shuffle': shuffle}, daemon=True) self.max_epochs = max_epochs self._epoch_count = 0 self._epoch_size = len(files) self.loader_alive = True self.started = True self.thread.start()
def file_loader(files, fq, shuffle=True): """ Function to fill up the file queue. This tops up the file queue whenever it gets low. """ while fq.epoch_count < fq.max_epochs and not fq.killed: # print('Checking fq') if fq.qsize() < 0.5*len(files) and fq.epoch_count < fq.max_epochs: epochs_to_put = min( EPOCHS_TO_PUT, fq.max_epochs - fq.epoch_count) # Load multiple epochs in at a time for i in range(epochs_to_put): if shuffle: random.shuffle(files) [fq.put(item) for item in files] fq.epoch_count += 1 else: # print('Sleeping') time.sleep(FILEQUEUE_SLEEPTIME) fq.loader_alive = False # fq.cancel_join_thread() class ImgLoader(threading.Thread): """ A thread to load in images from a filename queue into an image queue. This must be instantiated with a file queue (to read from) and an image queue (to load into). If you call the :py:meth:`ImageQueue.kill` method, the signal will be passed on to this process and it will die gracefully. """ def __init__(self, idx, file_queue, img_queue, img_size=None, img_dir=None, transform=None): threading.Thread.__init__(self, daemon=True) self.idx = idx self.fqueue = file_queue self.iqueue = img_queue self.img_size = img_size if img_dir is not None: self.base_dir = img_dir else: self.base_dir = '' self.transform = transform def _load_image(self, im=''): """ Load an image in and return it as a numpy array. """ img = Image.open(im) if self.img_size is not None: img = img.resize(self.img_size) # Make sure it is 3 channel img = img.convert(mode='RGB') img_np = np.array(img).astype(np.float32) if self.transform is not None: img_np = self.transform(img_np) return img_np def run(self): # print("Starting ImgLoader {}".format(self.idx)) # Check the image queue's kill signal. So long as this is down, keep # running. while not self.iqueue.killed: # Try get an item - the file queue running out is the main way for # this thread to exit. data = False try: item = self.fqueue.get(block=True, timeout=FILEQUEUE_BLOCKTIME) data = True except queue.Empty: # If the file queue timed out and there's nothing filling it - # exit if not self.fqueue.filling: break # Split the item into a filename and label if data: try: f, label = item except: f = item label = None img = self._load_image(os.path.join(self.base_dir, f)) self.iqueue.put((img, label)) # print("Img Loader {} Done".format(self.idx)) # self.iqueue.cancel_join_thread() self.iqueue.loaders_alive[self.idx] = 0 def catch_empty(func, handle=lambda e: e, *args, **kwargs): """ Returns the empty exception rather than raising it Useful for calling queue.get in a list comprehension """ try: return func(*args, **kwargs) except queue.Empty as e: return handle(e)
[docs]class ImgQueueNotStarted(Exception): """Exception Raised when trying to pull from an Image queue that hasn't had its feeders started. """ def __init__(self, value): self.value = value def __str__(self): return repr(self.value)
[docs]class FileQueueNotStarted(Exception): """Exception Raised when trying to pull from a File queue that hasn't had its manager started.""" def __init__(self, value): self.value = value def __str__(self): return repr(self.value)
[docs]class FileQueueDepleted(Exception): """Exception Raised when the file queue has been depleted. Will be raised when the epoch limit is reached.""" def __init__(self, value): self.value = value def __str__(self): return repr(self.value)