Uncategorized

Using MRUnit with Multiple Outputs

I recently finished work on a rather large suite of Hadoop jobs, and at some point I realized there was no way I could confidently submit the code without having rigorous automated tests to accompany it.  The jobs we had written prior to this at Jana did not necessarily warrant setting up a full framework for such testing, because they have fairly specific inputs that can be tested well manually, but there were simply too many complex, interacting components and too diverse a set of possible inputs to continue that trend here.  Note that our method for manual testing has been to get appropriate input data, run a job locally with LocalJobRunner, and validate the output ad-hoc.

So I chose MRUnit as a testing tool.  MRUnit provides drivers for testing Hadoop mappers and reducers separately and together, and even for simulating a pipeline of jobs by supplying a set of (mapper, reducer) pairs.  It is an exciting project, which became a top-level Apache project in 2012, but as of v1.0.0 it is missing support for several features we use in our jobs.  Still, I wanted to take advantage of the nice functionality it does provide, so I resorted to modifying the jobs being tested and extending MRUnit as appropriate to achieve compatibility.

In this post I’m just going to focus on how I extended MRUnit to validate output from MultipleOutputs.  It does look like support for MultipleOutputs will be added in MRUnit 1.1.0 for at least the new mapreduce API, but I’m not sure what the timetable on that is, and we are still using the old mapred API anyway.

I was only using MultipleOutputs in the reducer of the job I was testing, so I’ll be talking about ReduceDriver here, but MapDriver could be extended in much the same way.  The behavior I was ultimately looking for was validation of both the normal output from the reducer AND any output from MultipleOutputs.

I.

The first step was to create a mock MultipleOutputs class equipped with an overridden getCollector(…) method that returns an implementation of OutputCollectable, MRUnit’s internal mock extension of OutputCollector.   Since we only anticipate using (Text, Text) collectors from MultipleOutputs, there was no need to make this code generic. The mock collector, which uses MockOutputCollector from MRUnit as a template, follows.  The idea is that the collect(…) method stores copies of the (key, value) pairs on which it is invoked, and then the getOutputs(…) method can be called to retrieve that data when validation is being done.


public class MockTextCollector implements OutputCollectable<Text, Text> {

    private final List<Pair<Text, Text>> collectedOutputs;
    private final Serialization serialization;
    private final Text nullTextReplacement;

    public MockTextCollector(final Configuration conf, Text nullTextReplacement) {
        collectedOutputs = new ArrayList<Pair<Text, Text>>();
        serialization = new Serialization(conf);
        this.nullTextReplacement = nullTextReplacement;
    }

    public MockTextCollector(final Configuration conf) {
        collectedOutputs = new ArrayList<Pair<Text, Text>>();
        serialization = new Serialization(conf);
        this.nullTextReplacement = new Text("");
    }

    @Override
    public void collect(final Text key, final Text value) throws IOException {
        Text new_key = (key == null) ? nullTextReplacement : key;
        collectedOutputs.add(new Pair<Text, Text>(serialization.copy(new_key),
                                                  serialization.copy(value)));
    }

    public List<Pair<Text, Text>> getOutputs() {
        return collectedOutputs;
    }
}

So that’s fairly straightforward. Note that null keys are replaced with placeholders because, otherwise, MRUnit fails while trying to serialize them (it seems the same would be true for values, but that wasn’t a concern here).

II.

The next level up is the mock extension of the MultipleOutputs class, which uses MockTextCollector.  The full file can be seen here: MockMultipleTextOutputs.  The key components are the  getCollector(…) method and the getOutput() method, which are also shown below for easy reference.


private Map<String, MockTextCollector> collectors = new HashMap<String, MockTextCollector>();

@Override
public OutputCollector getCollector(String namedOutput, String multiName,
                                     Reporter reporter) throws IOException {

    boolean multi = isMultiNamedOutput(conf, namedOutput);
    String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;

    MockTextCollector existing_collector = collectors.get(baseFileName);
    if (existing_collector != null) {
        return existing_collector;
    }

    checkNamedOutputName(namedOutput);

    boolean foundMatch = false;
    Iterator<String> namedOutputsIter = getNamedOutputs();
    while (namedOutputsIter.hasNext()) {
        if (namedOutputsIter.next().equals(namedOutput)) {
            foundMatch = true;
        }
    }

    if (!foundMatch) {
        throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'");
    }

    if (!multi && multiName != null) {
        throw new IllegalArgumentException(
                "Name output '" + namedOutput + "' has not been defined as multi");
    }
    if (multi) {
        checkTokenName(multiName);
    }

    MockTextCollector mockTextCollector = new MockTextCollector(conf, nullTextReplacement);
    collectors.put(baseFileName, mockTextCollector);
    return mockTextCollector;

}

public TreeMap<String, List<Pair<Text, Text>>> getOutput() {
    TreeMap<String, List<Pair<Text, Text>>> result =
            new TreeMap<String, List<Pair<Text,Text>>>();
    for (Map.Entry<String, MockTextCollector> entry : collectors.entrySet()) {
        result.put(entry.getKey(), entry.getValue().getOutputs());
    }
    return result;
}

When getCollector(…) is called on a new named output, a MockTextCollector is returned if all validation passes, but the map of output names to MockTextCollectors is also updated (with the output name mapped to the same collector that was returned).  When getCollector(…) is called on a named output which is already present in the map, the corresponding collector is simply retrieved from the map and returned.  Finally, when it’s time to retrieve all of the output from the various named outputs for validation, getOutput() will iterate through the accumulated map of output names to collectors and assemble and return a map of those names to the corresponding collected outputs as reported by the getOutputs() method on the collector.  The class also contains a few private methods copied exactly from MultipleOutputs to allow the override of getCollector(…).

III.

The last and most important component is the extension of ReduceDriver, which can be seen in full here: MultipleTextOutputsReduceDriver.  The most important functions are the two validate(…) functions.  The first one is exactly the same as the version ReduceDriver typically inherits from TestDriver, except that it is generic (which also required making generic versions of several functions called therein).  The genericity allows us to call the same validate(…) function on both the normal reducer output and the results from each of the multiple outputs.  To the latter end, the second incarnation of validate(…) simply delegates to the first, invoking it on the lists of received and expected outputs for each named output:


protected <K, V> void validate(TreeMap<String, List<Pair<K, V>>> outputs,
                               TreeMap<String, List<Pair<K, V>>> expectedOutputs) {

    HashSet<String> expectedAndIgnoredOutputs = new HashSet<String>(expectedOutputs.keySet());
    expectedAndIgnoredOutputs.addAll(ignoredNamedOutputs);
    if (!outputs.keySet().equals(expectedAndIgnoredOutputs)) {
        String msg = "Actual and (required + ignored) named outputs do not match.\n"
                     + "RECEIVED: " + outputs.keySet() + "\n"
                     + "REQUIRED: " + expectedOutputs.keySet() + "\n"
                     + "IGNORED: " + ignoredNamedOutputs;
        LOG.error(msg);
        Assert.fail(msg);
    } else {
        Iterator<Entry<String, List<Pair<K, V>>>> iter1 = outputs.entrySet().iterator();
        Iterator<Entry<String, List<Pair<K, V>>>> iter2 =
                expectedOutputs.entrySet().iterator();
        while (iter1.hasNext()) {
            Entry<String, List<Pair<K, V>>> entry1 = iter1.next();
            if (!ignoredNamedOutputs.contains(entry1.getKey())) {
                LOG.info("Validating " + entry1.getKey());
                validate(entry1.getValue(), iter2.next().getValue(), true);
                LOG.info("Validation succeeded for " + entry1.getKey());
            }
        }
    }
}

Simple methods are provided for specifying the ignored named outputs and the expected results for those that we do want to validate.  The actual received results are retrievable internally because the MockMultipleOutputs instance used is an attribute of MultipleTextOutputsReduceDriver, set with registerMultipleOutputs(…), and we can simply call getOutputs() on that object after a run.  The results are then cleared before subsequent runs.

The two functions inherited from TestDriver for actually running a test, runTest() and run(), are also tweaked. The modifications to runTest(…) are straightforward—I’ve simply added the option of validation of MultipleOutputs results after validation of the normal output, and run() is left largely the same except for the injection of the MockMultipleOutputs instance into the actual reducer under test.  This is done via reflection in one of the more interesting pieces of code here.  Reflection is a wonderful tool for dependency injection because it is minimally disruptive to the code under test (potentially requiring no modifications!) and its lackluster performance is not typically a worry in testing.  The function that performs the injection can be seen here: ReflectUtils. And here is run(), complete with the relevant invocation, for quick reference:


@Override
public List<Pair<K2, V2>> run() throws IOException {
    try {
        preRunChecks(getReducer());
        initDistributedCache();
        final OutputCollectable<K2, V2> outputCollectable =
                mockOutputCreator.createMapredOutputCollectable(getConfiguration(),
        getOutputSerializationConfiguration());
        final MockReporter reporter = 
                new MockReporter(MockReporter.ReporterType.Reducer, getCounters());

        ReflectionUtils.setConf(getReducer(), new JobConf(getConfiguration()));

        mockMultipleOutputs.clear();

        String resultMsg = ReflectUtils.injectMockByClass(MultipleOutputs.class, getReducer(),
                                                          mockMultipleOutputs);
        LOG.info(resultMsg);

        for (Pair<K1, List<V1>> kv : inputs) {
            getReducer().reduce(kv.getFirst(), kv.getSecond().iterator(),
                                outputCollectable, reporter);
    }
        getReducer().close();
        return outputCollectable.getOutputs();
    } finally {
        cleanupDistributedCache();
    }
} 

 

And that’s it!  The only other requirement is to register named outputs with the JobConf in the test setup, just as one would in the job under test.  Here is a final link to a trimmed-down example of a test using all of this: https://gist.github.com/nowell-jana/11275829.

Comments/suggestions are welcome.

– Nowell Closser

 

Discussion

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s